Compare commits

...

4 Commits

Author SHA1 Message Date
riwiwa
1df12b1755 update readme 2026-02-09 19:54:02 -08:00
c4314456ae add spotify import progress bar 2026-02-09 19:51:30 -08:00
7fe4d02721 clean up backend spotify import functionality 2026-02-09 18:34:49 -08:00
d35e7bffd3 remove db name environment variable 2026-02-09 18:34:37 -08:00
7 changed files with 361 additions and 215 deletions

View File

@@ -8,7 +8,7 @@
### Roadmap: ### Roadmap:
- Ability to import all listening statistics and scrobbles from: \[In Progress\] - Ability to import all listening statistics and scrobbles from: \[In Progress\]
- LastFM \[Complete\] - LastFM \[Complete\]
- Spotify \[In Progress\] - Spotify \[Complete\]
- Apple Music \[Planned\] - Apple Music \[Planned\]
- WebUI \[In Progress\] - WebUI \[In Progress\]

View File

@@ -21,23 +21,23 @@ func CreateAllTables() error {
return CreateSessionsTable() return CreateSessionsTable()
} }
func GetDbUrl(noDBName bool) string { func GetDbUrl(dbName bool) string {
host := os.Getenv("PGHOST") host := os.Getenv("PGHOST")
port := os.Getenv("PGPORT") port := os.Getenv("PGPORT")
user := os.Getenv("PGUSER") user := os.Getenv("PGUSER")
pass := os.Getenv("PGPASSWORD") pass := os.Getenv("PGPASSWORD")
db := os.Getenv("PGDATABASE")
if noDBName { if dbName {
return fmt.Sprintf("postgres://%s:%s@%s:%s", user, pass, host, port) return fmt.Sprintf("postgres://%s:%s@%s:%s/%s",
user, pass, host, port, "muzi")
} else { } else {
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s", user, pass, host, port, db) return fmt.Sprintf("postgres://%s:%s@%s:%s", user, pass, host, port)
} }
} }
func CreateDB() error { func CreateDB() error {
conn, err := pgx.Connect(context.Background(), conn, err := pgx.Connect(context.Background(),
GetDbUrl(true), GetDbUrl(false),
) )
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Cannot connect to PostgreSQL: %v\n", err) fmt.Fprintf(os.Stderr, "Cannot connect to PostgreSQL: %v\n", err)

View File

@@ -22,7 +22,7 @@ func main() {
check("ensuring muzi DB exists", db.CreateDB()) check("ensuring muzi DB exists", db.CreateDB())
var err error var err error
db.Pool, err = pgxpool.New(context.Background(), db.GetDbUrl(false)) db.Pool, err = pgxpool.New(context.Background(), db.GetDbUrl(true))
check("connecting to muzi database", err) check("connecting to muzi database", err)
defer db.Pool.Close() defer db.Pool.Close()

View File

@@ -12,6 +12,8 @@ import (
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
const batchSize = 1000
type SpotifyTrack struct { type SpotifyTrack struct {
Timestamp string `json:"ts"` Timestamp string `json:"ts"`
Played int `json:"ms_played"` Played int `json:"ms_played"`
@@ -20,25 +22,30 @@ type SpotifyTrack struct {
Album string `json:"master_metadata_album_album_name"` Album string `json:"master_metadata_album_album_name"`
} }
type existingTrack struct { type trackSource struct {
tracks []SpotifyTrack
tracksToSkip map[string]struct{}
idx int
userId int
}
type dbTrack struct {
Timestamp time.Time Timestamp time.Time
SongName string SongName string
Artist string Artist string
} }
// trackSource implements pgx.CopyFromSource for efficient bulk inserts
type trackSource struct {
tracks []SpotifyTrack
existing map[string]struct{}
idx int
userId int
}
func (s *trackSource) Next() bool { func (s *trackSource) Next() bool {
for s.idx < len(s.tracks) { for s.idx < len(s.tracks) {
t := s.tracks[s.idx] t := s.tracks[s.idx]
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp) ts, err := normalizeTs(t.Timestamp)
if _, exists := s.existing[key]; exists { if err != nil {
fmt.Fprintf(os.Stderr, "Error normalizing timestamp: %v\n", err)
s.idx++
continue
}
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, ts)
if _, shouldSkip := s.tracksToSkip[key]; shouldSkip {
s.idx++ s.idx++
continue continue
} }
@@ -51,7 +58,6 @@ func (s *trackSource) Next() bool {
func (s *trackSource) Values() ([]any, error) { func (s *trackSource) Values() ([]any, error) {
// idx is already incremented in Next(), so use idx-1 // idx is already incremented in Next(), so use idx-1
t := s.tracks[s.idx-1] t := s.tracks[s.idx-1]
// parse spotify string timestamp to a real time object
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -71,48 +77,62 @@ func (s *trackSource) Err() error {
return nil return nil
} }
// find tracks with the same artist and name in a 20 second +- timeframe func normalizeTs(ts string) (string, error) {
t, err := time.Parse(time.RFC3339Nano, ts)
if err != nil {
return "", err
}
return t.Format(time.RFC3339Nano), nil
}
func getExistingTracks( func getExistingTracks(
userId int, userId int, tracks []SpotifyTrack,
tracks []SpotifyTrack,
) (map[string]struct{}, error) { ) (map[string]struct{}, error) {
// check for empty track import minTs, maxTs := findTimeRange(tracks)
if len(tracks) == 0 {
return map[string]struct{}{}, nil
}
// find min/max timestamps in this batch to create time window to
// search for duplicates
var minTs, maxTs time.Time
// go through each track (t) in the array
for _, t := range tracks {
// parse spotify timestamp into operational time datatype
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil {
continue
}
// if minTs uninitialized or timestamp predates minTs
if minTs.IsZero() || ts.Before(minTs) {
minTs = ts
}
// if timestamp comes after maxTs
if ts.After(maxTs) {
maxTs = ts
}
}
// check if all parses failed, therefore no way to find duplicate by time
if minTs.IsZero() { if minTs.IsZero() {
return map[string]struct{}{}, nil return map[string]struct{}{}, nil
} }
// find all tracks within [min-20s, max+20s] window (duplicates) dbTracks, err := fetchDbTracks(userId, minTs, maxTs)
if err != nil {
return nil, err
}
dbIndex := buildDbTrackIndex(dbTracks)
return findDuplicates(tracks, dbIndex), nil
}
// get the min/max timestamp range for a batch of tracks
func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) {
var minTs, maxTs time.Time
for _, t := range tracks {
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil {
continue
}
if minTs.IsZero() || ts.Before(minTs) {
minTs = ts
}
if ts.After(maxTs) {
maxTs = ts
}
}
return minTs, maxTs
}
/*
get all tracks in the database for a user that have the same timestamp
range as the current batch
*/
func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) {
rows, err := db.Pool.Query(context.Background(), rows, err := db.Pool.Query(context.Background(),
`SELECT song_name, artist, timestamp `SELECT song_name, artist, timestamp
FROM history FROM history
WHERE user_id = $1 WHERE user_id = $1
AND timestamp BETWEEN $2 AND $3`, AND timestamp BETWEEN $2 AND $3`,
userId, userId,
// adjust 20 seconds to find duplicates on edges of batch
minTs.Add(-20*time.Second), minTs.Add(-20*time.Second),
maxTs.Add(20*time.Second)) maxTs.Add(20*time.Second))
if err != nil { if err != nil {
@@ -120,102 +140,138 @@ func getExistingTracks(
} }
defer rows.Close() defer rows.Close()
// prepare map to hold duplicate track keys var dbTracks []dbTrack
existing := make(map[string]struct{})
// create array of tracks
var existingTracks []existingTrack
// for each repeat play (-20s +20s)
for rows.Next() { for rows.Next() {
// write the data from json to the track in memory var t dbTrack
var t existingTrack
if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil { if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil {
continue continue
} }
// add track in memory to existingTracks array dbTracks = append(dbTracks, t)
existingTracks = append(existingTracks, t)
} }
return dbTracks, nil
}
// index existing tracks by artist|name for O(1) lookup func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time {
existingIndex := make(map[string][]time.Time) index := make(map[string][]time.Time)
for _, t := range existingTracks { for _, t := range tracks {
key := t.Artist + "|" + t.SongName key := t.Artist + "|" + t.SongName
existingIndex[key] = append(existingIndex[key], t.Timestamp) index[key] = append(index[key], t.Timestamp)
} }
return index
}
// check each new track against indexed existing tracks func findDuplicates(tracks []SpotifyTrack, dbIndex map[string][]time.Time) map[string]struct{} {
for _, newTrack := range tracks { duplicates := make(map[string]struct{})
newTs, err := time.Parse(time.RFC3339Nano, newTrack.Timestamp) seenInBatch := make(map[string]struct{})
for _, track := range tracks {
trackKey, err := createTrackKey(track)
if err != nil { if err != nil {
continue continue
} }
lookupKey := newTrack.Artist + "|" + newTrack.Name // in batch check
if timestamps, found := existingIndex[lookupKey]; found { if _, seen := seenInBatch[trackKey]; seen {
for _, existTs := range timestamps { duplicates[trackKey] = struct{}{}
diff := newTs.Sub(existTs) continue
if diff < 0 { }
diff = -diff seenInBatch[trackKey] = struct{}{}
}
if diff < 20*time.Second { // in db check
key := fmt.Sprintf( lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name)
"%s|%s|%s", if dbTimestamps, found := dbIndex[lookupKey]; found {
newTrack.Artist, if isDuplicateWithinWindow(track, dbTimestamps) {
newTrack.Name, duplicates[trackKey] = struct{}{}
newTrack.Timestamp,
)
existing[key] = struct{}{}
break
}
} }
} }
} }
return existing, nil return duplicates
} }
func ImportSpotify(tracks []SpotifyTrack, userId int) error { func createTrackKey(track SpotifyTrack) (string, error) {
totalImported := 0 ts, err := normalizeTs(track.Timestamp)
batchSize := 1000 if err != nil {
return "", err
}
return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts), nil
}
for batchStart := 0; batchStart < len(tracks); batchStart += batchSize { // check if a track timestamp falls < 20 seconds of another
// get the limit of the current batch func isDuplicateWithinWindow(track SpotifyTrack, existingTimestamps []time.Time) bool {
batchEnd := batchStart + batchSize trackTime, err := time.Parse(time.RFC3339Nano, track.Timestamp)
// set limit to track array length in current batch too big if err != nil {
if batchEnd > len(tracks) { return false
batchEnd = len(tracks) }
for _, existingTime := range existingTimestamps {
diff := trackTime.Sub(existingTime)
if diff < 0 {
diff = -diff
} }
if diff < 20*time.Second {
return true
}
}
return false
}
func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan ProgressUpdate) error {
totalImported := 0
totalTracks := len(tracks)
batchStart := 0
totalBatches := (totalTracks + batchSize - 1) / batchSize
// Send initial progress update
if progressChan != nil {
progressChan <- ProgressUpdate{
TotalPages: totalBatches,
Status: "running",
}
}
for batchStart < totalTracks {
// cap batchEnd at total track count on final batch to prevent OOB error
batchEnd := min(batchStart+batchSize, totalTracks)
currentBatch := (batchStart / batchSize) + 1
// create array to hold valid listens
var validTracks []SpotifyTrack var validTracks []SpotifyTrack
for i := batchStart; i < batchEnd; i++ { for i := batchStart; i < batchEnd; i++ {
// if current track is listened to for 20 sec and name and artist is not if tracks[i].Played >= 20000 && // 20 seconds
// blank, add to validTracks array tracks[i].Name != "" &&
if tracks[i].Played >= 20000 && tracks[i].Name != "" && tracks[i].Artist != "" { tracks[i].Artist != "" {
validTracks = append(validTracks, tracks[i]) validTracks = append(validTracks, tracks[i])
} }
} }
// if there are no valid tracks in this batch, go to the next
if len(validTracks) == 0 { if len(validTracks) == 0 {
batchStart += batchSize
// Send progress update even for empty batches
if progressChan != nil {
progressChan <- ProgressUpdate{
CurrentPage: currentBatch,
CompletedPages: currentBatch,
TotalPages: totalBatches,
TracksImported: totalImported,
Status: "running",
}
}
continue continue
} }
// find replayed tracks in the batch that was just gathered tracksToSkip, err := getExistingTracks(userId, validTracks)
existing, err := getExistingTracks(userId, validTracks)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err) fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err)
batchStart += batchSize
continue continue
} }
// get data of struct pointer
src := &trackSource{ src := &trackSource{
tracks: validTracks, tracks: validTracks,
existing: existing, tracksToSkip: tracksToSkip,
idx: 0, idx: 0,
userId: userId, userId: userId,
} }
// insert all valid tracks from current batch into db
copyCount, err := db.Pool.CopyFrom( copyCount, err := db.Pool.CopyFrom(
context.Background(), context.Background(),
pgx.Identifier{"history"}, pgx.Identifier{"history"},
@@ -237,6 +293,31 @@ func ImportSpotify(tracks []SpotifyTrack, userId int) error {
} else { } else {
totalImported += int(copyCount) totalImported += int(copyCount)
} }
// Send progress update
if progressChan != nil {
progressChan <- ProgressUpdate{
CurrentPage: currentBatch,
CompletedPages: currentBatch,
TotalPages: totalBatches,
TracksImported: totalImported,
Status: "running",
}
}
batchStart += batchSize
} }
// Send completion update
if progressChan != nil {
progressChan <- ProgressUpdate{
CurrentPage: totalBatches,
CompletedPages: totalBatches,
TotalPages: totalBatches,
TracksImported: totalImported,
Status: "completed",
}
}
return nil return nil
} }

83
static/import.js Normal file
View File

@@ -0,0 +1,83 @@
function handleImport(formId, progressPrefix, endpoint, progressUrl, formatLabel) {
const form = document.getElementById(formId);
const progressContainer = document.getElementById(progressPrefix + '-progress');
const progressFill = document.getElementById(progressPrefix + '-progress-fill');
const progressText = document.getElementById(progressPrefix + '-progress-text');
const progressStatus = document.getElementById(progressPrefix + '-progress-status');
const progressTracks = document.getElementById(progressPrefix + '-progress-tracks');
const progressError = document.getElementById(progressPrefix + '-progress-error');
const progressSuccess = document.getElementById(progressPrefix + '-progress-success');
form.addEventListener('submit', async function(e) {
e.preventDefault();
// Reset and show progress
progressFill.style.width = '0%';
progressFill.classList.add('animating');
progressText.textContent = '0%';
progressStatus.textContent = 'Starting import...';
progressTracks.textContent = '';
progressError.textContent = '';
progressSuccess.textContent = '';
progressContainer.style.display = 'block';
try {
const response = await fetch(endpoint, {
method: 'POST',
body: progressPrefix === 'lastfm'
? new URLSearchParams(new FormData(form))
: new FormData(form)
});
if (!response.ok) throw new Error('Failed to start import: ' + response.statusText);
const { job_id } = await response.json();
const eventSource = new EventSource(progressUrl + job_id);
eventSource.onmessage = function(event) {
const update = JSON.parse(event.data);
if (update.status === 'connected') return;
if (update.total_pages > 0) {
const completed = update.completed_pages || update.current_page || 0;
const percent = Math.round((completed / update.total_pages) * 100);
progressFill.style.width = percent + '%';
progressText.textContent = percent + '%';
progressStatus.textContent = 'Processing ' + formatLabel + ' ' + completed + ' of ' + update.total_pages;
}
if (update.tracks_imported !== undefined) {
progressTracks.textContent = update.tracks_imported.toLocaleString() + ' tracks imported';
}
if (update.status === 'completed') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import completed!';
progressSuccess.textContent = 'Successfully imported ' + update.tracks_imported.toLocaleString() + ' tracks from ' + (progressPrefix === 'spotify' ? 'Spotify' : 'Last.fm');
eventSource.close();
form.reset();
} else if (update.status === 'error') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + (update.error || 'Unknown error');
eventSource.close();
}
};
eventSource.onerror = function() {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Connection error';
progressError.textContent = 'Lost connection to server. The import may still be running in the background.';
eventSource.close();
};
} catch (err) {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + err.message;
}
});
}
handleImport('spotify-form', 'spotify', '/import/spotify', '/import/spotify/progress?job=', 'batch');
handleImport('lastfm-form', 'lastfm', '/import/lastfm', '/import/lastfm/progress?job=', 'page');

View File

@@ -19,10 +19,21 @@
<div class="import-section"> <div class="import-section">
<h2>Spotify</h2> <h2>Spotify</h2>
<p>Import your Spotify listening history from your data export.</p> <p>Import your Spotify listening history from your data export.</p>
<form method="POST" action="/import/spotify" enctype="multipart/form-data"> <form id="spotify-form" method="POST" action="/import/spotify" enctype="multipart/form-data">
<input type="file" name="json_files" accept=".json,application/json" multiple required> <input type="file" name="json_files" accept=".json,application/json" multiple required>
<button type="submit">Upload Spotify Data</button> <button type="submit">Upload Spotify Data</button>
</form> </form>
<div id="spotify-progress" class="progress-container" style="display: none;">
<div class="progress-status" id="spotify-progress-status">Initializing...</div>
<div class="progress-bar-wrapper">
<div class="progress-bar-fill" id="spotify-progress-fill"></div>
<div class="progress-text" id="spotify-progress-text">0%</div>
</div>
<div class="progress-tracks" id="spotify-progress-tracks"></div>
<div class="progress-error" id="spotify-progress-error"></div>
<div class="progress-success" id="spotify-progress-success"></div>
</div>
</div> </div>
<div class="import-section"> <div class="import-section">
@@ -35,114 +46,18 @@
</form> </form>
<div id="lastfm-progress" class="progress-container" style="display: none;"> <div id="lastfm-progress" class="progress-container" style="display: none;">
<div class="progress-status" id="progress-status">Initializing...</div> <div class="progress-status" id="lastfm-progress-status">Initializing...</div>
<div class="progress-bar-wrapper"> <div class="progress-bar-wrapper">
<div class="progress-bar-fill" id="progress-fill"></div> <div class="progress-bar-fill" id="lastfm-progress-fill"></div>
<div class="progress-text" id="progress-text">0%</div> <div class="progress-text" id="lastfm-progress-text">0%</div>
</div> </div>
<div class="progress-tracks" id="progress-tracks"></div> <div class="progress-tracks" id="lastfm-progress-tracks"></div>
<div class="progress-error" id="progress-error"></div> <div class="progress-error" id="lastfm-progress-error"></div>
<div class="progress-success" id="progress-success"></div> <div class="progress-success" id="lastfm-progress-success"></div>
</div> </div>
</div> </div>
</div> </div>
<script> <script src="/files/import.js"></script>
document.getElementById('lastfm-form').addEventListener('submit', async function(e) {
e.preventDefault();
const form = e.target;
const formData = new FormData(form);
const progressContainer = document.getElementById('lastfm-progress');
const progressFill = document.getElementById('progress-fill');
const progressText = document.getElementById('progress-text');
const progressStatus = document.getElementById('progress-status');
const progressTracks = document.getElementById('progress-tracks');
const progressError = document.getElementById('progress-error');
const progressSuccess = document.getElementById('progress-success');
// Reset and show progress container
progressFill.style.width = '0%';
progressFill.classList.add('animating');
progressText.textContent = '0%';
progressStatus.textContent = 'Starting import...';
progressTracks.textContent = '';
progressError.textContent = '';
progressSuccess.textContent = '';
progressContainer.style.display = 'block';
try {
// Convert FormData to URLSearchParams for proper form encoding
const params = new URLSearchParams();
for (const [key, value] of formData) {
params.append(key, value);
}
// Start the import
const response = await fetch('/import/lastfm', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
body: params
});
if (!response.ok) {
throw new Error('Failed to start import: ' + response.statusText);
}
const data = await response.json();
const jobId = data.job_id;
// Connect to SSE endpoint
const eventSource = new EventSource('/import/lastfm/progress?job=' + jobId);
eventSource.onmessage = function(event) {
const update = JSON.parse(event.data);
if (update.status === 'connected') {
return;
}
if (update.total_pages > 0) {
const completed = update.completed_pages || update.current_page || 0;
const percent = Math.round((completed / update.total_pages) * 100);
progressFill.style.width = percent + '%';
progressText.textContent = percent + '%';
progressStatus.textContent = 'Processing page ' + completed + ' of ' + update.total_pages;
}
if (update.tracks_imported !== undefined) {
progressTracks.textContent = update.tracks_imported.toLocaleString() + ' tracks imported';
}
if (update.status === 'completed') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import completed!';
progressSuccess.textContent = 'Successfully imported ' + update.tracks_imported.toLocaleString() + ' tracks from Last.fm';
eventSource.close();
form.reset();
} else if (update.status === 'error') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + (update.error || 'Unknown error');
eventSource.close();
}
};
eventSource.onerror = function(err) {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Connection error';
progressError.textContent = 'Lost connection to server. The import may still be running in the background.';
eventSource.close();
};
} catch (err) {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + err.message;
}
});
</script>
</body> </body>
</html> </html>

View File

@@ -506,10 +506,32 @@ func importSpotifyHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := migrate.ImportSpotify(allTracks, userId); err != nil { jobID, err := generateID()
http.Error(w, "Failed to process tracks", http.StatusInternalServerError) if err != nil {
fmt.Fprintf(os.Stderr, "Error generating jobID: %v\n", err)
http.Error(w, "Error generating jobID", http.StatusBadRequest)
return return
} }
progressChan := make(chan migrate.ProgressUpdate, 100)
jobsMu.Lock()
importJobs[jobID] = progressChan
jobsMu.Unlock()
go func() {
migrate.ImportSpotify(allTracks, userId, progressChan)
jobsMu.Lock()
delete(importJobs, jobID)
jobsMu.Unlock()
close(progressChan)
}()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"job_id": jobID,
"status": "started",
})
} }
func importLastFMHandler(w http.ResponseWriter, r *http.Request) { func importLastFMHandler(w http.ResponseWriter, r *http.Request) {
@@ -617,6 +639,50 @@ func importLastFMProgressHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
func importSpotifyProgressHandler(w http.ResponseWriter, r *http.Request) {
jobID := r.URL.Query().Get("job")
if jobID == "" {
http.Error(w, "Missing job ID", http.StatusBadRequest)
return
}
jobsMu.RLock()
job, exists := importJobs[jobID]
jobsMu.RUnlock()
if !exists {
http.Error(w, "Job not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "data: %s\n\n", `{"status":"connected"}`)
flusher.Flush()
for update := range job {
data, err := json.Marshal(update)
if err != nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", string(data))
flusher.Flush()
if update.Status == "completed" || update.Status == "error" {
return
}
}
}
func Start() { func Start() {
addr := ":1234" addr := ":1234"
r := chi.NewRouter() r := chi.NewRouter()
@@ -632,6 +698,7 @@ func Start() {
r.Post("/import/lastfm", importLastFMHandler) r.Post("/import/lastfm", importLastFMHandler)
r.Post("/import/spotify", importSpotifyHandler) r.Post("/import/spotify", importSpotifyHandler)
r.Get("/import/lastfm/progress", importLastFMProgressHandler) r.Get("/import/lastfm/progress", importLastFMProgressHandler)
r.Get("/import/spotify/progress", importSpotifyProgressHandler)
fmt.Printf("WebUI starting on %s\n", addr) fmt.Printf("WebUI starting on %s\n", addr)
prot := http.NewCrossOriginProtection() prot := http.NewCrossOriginProtection()
http.ListenAndServe(addr, prot.Handler(r)) http.ListenAndServe(addr, prot.Handler(r))