From 2f56d16f98685e93ddde35d8923f101ad44ab4af Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Sat, 7 Mar 2026 09:07:26 -0800 Subject: [PATCH] fix: various fixes in background jobs (#1362) Co-authored-by: Elias Schneider --- backend/internal/job/analytics_job.go | 2 +- backend/internal/job/api_key_expiry_job.go | 8 +- backend/internal/job/db_cleanup_job.go | 31 +++-- backend/internal/job/file_cleanup_job.go | 20 +++- backend/internal/job/geoloite_update_job.go | 2 +- backend/internal/job/ldap_job.go | 6 +- backend/internal/job/scheduler.go | 48 ++++++-- backend/internal/job/scim_job.go | 4 +- backend/internal/service/api_key_service.go | 26 ++-- backend/internal/service/app_lock_service.go | 105 ++++++++++------ .../internal/service/app_lock_service_test.go | 112 ++++++++++++++++++ backend/internal/service/email_service.go | 3 +- backend/internal/service/scheduler.go | 25 ++++ backend/internal/service/scim_service.go | 40 +++---- .../api-key-expiring-soon_html.tmpl | 2 +- .../api-key-expiring-soon_text.tmpl | 2 +- .../postgres/20260304090200_indexes.down.sql | 1 + .../postgres/20260304090200_indexes.up.sql | 6 + .../sqlite/20260304090200_indexes.down.sql | 1 + .../sqlite/20260304090200_indexes.up.sql | 12 ++ .../emails/api-key-expiring-soon.tsx | 2 +- 21 files changed, 343 insertions(+), 115 deletions(-) create mode 100644 backend/internal/service/scheduler.go create mode 100644 backend/resources/migrations/postgres/20260304090200_indexes.down.sql create mode 100644 backend/resources/migrations/postgres/20260304090200_indexes.up.sql create mode 100644 backend/resources/migrations/sqlite/20260304090200_indexes.down.sql create mode 100644 backend/resources/migrations/sqlite/20260304090200_indexes.up.sql diff --git a/backend/internal/job/analytics_job.go b/backend/internal/job/analytics_job.go index f67e2042..c2e658df 100644 --- a/backend/internal/job/analytics_job.go +++ b/backend/internal/job/analytics_job.go @@ -28,7 +28,7 @@ func (s *Scheduler) RegisterAnalyticsJob(ctx context.Context, appConfig *service appConfig: appConfig, httpClient: httpClient, } - return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, true) + return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, service.RegisterJobOpts{RunImmediately: true}) } type AnalyticsJob struct { diff --git a/backend/internal/job/api_key_expiry_job.go b/backend/internal/job/api_key_expiry_job.go index 6524089b..2481010e 100644 --- a/backend/internal/job/api_key_expiry_job.go +++ b/backend/internal/job/api_key_expiry_job.go @@ -22,7 +22,7 @@ func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService * } // Send every day at midnight - return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, false) + return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, service.RegisterJobOpts{}) } func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error { @@ -42,7 +42,11 @@ func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) err } err = j.apiKeyService.SendApiKeyExpiringSoonEmail(ctx, key) if err != nil { - slog.ErrorContext(ctx, "Failed to send expiring API key notification email", slog.String("key", key.ID), slog.Any("error", err)) + slog.ErrorContext(ctx, "Failed to send expiring API key notification email", + slog.String("key", key.ID), + slog.String("user", key.User.ID), + slog.Any("error", err), + ) } } return nil diff --git a/backend/internal/job/db_cleanup_job.go b/backend/internal/job/db_cleanup_job.go index fca96516..4872fae1 100644 --- a/backend/internal/job/db_cleanup_job.go +++ b/backend/internal/job/db_cleanup_job.go @@ -7,28 +7,37 @@ import ( "log/slog" "time" - "github.com/go-co-op/gocron/v2" + backoff "github.com/cenkalti/backoff/v5" "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/common" "github.com/pocket-id/pocket-id/backend/internal/model" datatype "github.com/pocket-id/pocket-id/backend/internal/model/types" + "github.com/pocket-id/pocket-id/backend/internal/service" ) func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error { jobs := &DbCleanupJobs{db: db} - // Run every 24 hours (but with some jitter so they don't run at the exact same time), and now - def := gocron.DurationRandomJob(24*time.Hour-2*time.Minute, 24*time.Hour+2*time.Minute) + newBackOff := func() *backoff.ExponentialBackOff { + bo := backoff.NewExponentialBackOff() + bo.Multiplier = 4 + bo.RandomizationFactor = 0.1 + bo.InitialInterval = time.Second + bo.MaxInterval = 45 * time.Second + return bo + } + + // Use exponential backoff for each DB cleanup job so transient query failures are retried automatically rather than causing an immediate job failure return errors.Join( - s.RegisterJob(ctx, "ClearWebauthnSessions", def, jobs.clearWebauthnSessions, true), - s.RegisterJob(ctx, "ClearOneTimeAccessTokens", def, jobs.clearOneTimeAccessTokens, true), - s.RegisterJob(ctx, "ClearSignupTokens", def, jobs.clearSignupTokens, true), - s.RegisterJob(ctx, "ClearEmailVerificationTokens", def, jobs.clearEmailVerificationTokens, true), - s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", def, jobs.clearOidcAuthorizationCodes, true), - s.RegisterJob(ctx, "ClearOidcRefreshTokens", def, jobs.clearOidcRefreshTokens, true), - s.RegisterJob(ctx, "ClearReauthenticationTokens", def, jobs.clearReauthenticationTokens, true), - s.RegisterJob(ctx, "ClearAuditLogs", def, jobs.clearAuditLogs, true), + s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), + s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}), ) } diff --git a/backend/internal/job/file_cleanup_job.go b/backend/internal/job/file_cleanup_job.go index 2b141dac..71f0dd34 100644 --- a/backend/internal/job/file_cleanup_job.go +++ b/backend/internal/job/file_cleanup_job.go @@ -13,20 +13,26 @@ import ( "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/model" + "github.com/pocket-id/pocket-id/backend/internal/service" "github.com/pocket-id/pocket-id/backend/internal/storage" ) func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fileStorage storage.FileStorage) error { jobs := &FileCleanupJobs{db: db, fileStorage: fileStorage} - err := s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false) + var errs []error + errs = append(errs, + s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, service.RegisterJobOpts{}), + ) // Only necessary for file system storage if fileStorage.Type() == storage.TypeFileSystem { - err = errors.Join(err, s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true)) + errs = append(errs, + s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, service.RegisterJobOpts{RunImmediately: true}), + ) } - return err + return errors.Join(errs...) } type FileCleanupJobs struct { @@ -68,7 +74,8 @@ func (j *FileCleanupJobs) clearUnusedDefaultProfilePictures(ctx context.Context) // If these initials aren't used by any user, delete the file if _, ok := initialsInUse[initials]; !ok { filePath := path.Join(defaultPicturesDir, filename) - if err := j.fileStorage.Delete(ctx, filePath); err != nil { + err = j.fileStorage.Delete(ctx, filePath) + if err != nil { slog.ErrorContext(ctx, "Failed to delete unused default profile picture", slog.String("path", filePath), slog.Any("error", err)) } else { filesDeleted++ @@ -95,8 +102,9 @@ func (j *FileCleanupJobs) clearOrphanedTempFiles(ctx context.Context) error { return nil } - if err := j.fileStorage.Delete(ctx, p.Path); err != nil { - slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", err)) + rErr := j.fileStorage.Delete(ctx, p.Path) + if rErr != nil { + slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", rErr)) return nil } deleted++ diff --git a/backend/internal/job/geoloite_update_job.go b/backend/internal/job/geoloite_update_job.go index 65353757..7b163a8d 100644 --- a/backend/internal/job/geoloite_update_job.go +++ b/backend/internal/job/geoloite_update_job.go @@ -23,7 +23,7 @@ func (s *Scheduler) RegisterGeoLiteUpdateJobs(ctx context.Context, geoLiteServic jobs := &GeoLiteUpdateJobs{geoLiteService: geoLiteService} // Run every 24 hours (and right away) - return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, true) + return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, service.RegisterJobOpts{RunImmediately: true}) } func (j *GeoLiteUpdateJobs) updateGoeLiteDB(ctx context.Context) error { diff --git a/backend/internal/job/ldap_job.go b/backend/internal/job/ldap_job.go index 33646860..1547d954 100644 --- a/backend/internal/job/ldap_job.go +++ b/backend/internal/job/ldap_job.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/go-co-op/gocron/v2" - "github.com/pocket-id/pocket-id/backend/internal/service" ) @@ -17,8 +15,8 @@ type LdapJobs struct { func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.LdapService, appConfigService *service.AppConfigService) error { jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService} - // Register the job to run every hour - return s.RegisterJob(ctx, "SyncLdap", gocron.DurationJob(time.Hour), jobs.syncLdap, true) + // Register the job to run every hour (with some jitter) + return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, service.RegisterJobOpts{RunImmediately: true}) } func (j *LdapJobs) syncLdap(ctx context.Context) error { diff --git a/backend/internal/job/scheduler.go b/backend/internal/job/scheduler.go index 2a48c2a8..2ef2019b 100644 --- a/backend/internal/job/scheduler.go +++ b/backend/internal/job/scheduler.go @@ -5,9 +5,13 @@ import ( "errors" "fmt" "log/slog" + "time" + backoff "github.com/cenkalti/backoff/v5" "github.com/go-co-op/gocron/v2" "github.com/google/uuid" + + "github.com/pocket-id/pocket-id/backend/internal/service" ) type Scheduler struct { @@ -33,16 +37,12 @@ func (s *Scheduler) RemoveJob(name string) error { if job.Name() == name { err := s.scheduler.RemoveJob(job.ID()) if err != nil { - errs = append(errs, fmt.Errorf("failed to unqueue job %q with ID %q: %w", name, job.ID().String(), err)) + errs = append(errs, fmt.Errorf("failed to dequeue job %q with ID %q: %w", name, job.ID().String(), err)) } } } - if len(errs) > 0 { - return errors.Join(errs...) - } - - return nil + return errors.Join(errs...) } // Run the scheduler. @@ -64,7 +64,29 @@ func (s *Scheduler) Run(ctx context.Context) error { return nil } -func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error { +func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, jobFn func(ctx context.Context) error, opts service.RegisterJobOpts) error { + // If a BackOff strategy is provided, wrap the job with retry logic + if opts.BackOff != nil { + origJob := jobFn + jobFn = func(ctx context.Context) error { + _, err := backoff.Retry( + ctx, + func() (struct{}, error) { + return struct{}{}, origJob(ctx) + }, + backoff.WithBackOff(opts.BackOff), + backoff.WithNotify(func(err error, d time.Duration) { + slog.WarnContext(ctx, "Job failed, retrying", + slog.String("name", name), + slog.Any("error", err), + slog.Duration("retryIn", d), + ) + }), + ) + return err + } + } + jobOptions := []gocron.JobOption{ gocron.WithContext(ctx), gocron.WithName(name), @@ -91,13 +113,13 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job ), } - if runImmediately { + if opts.RunImmediately { jobOptions = append(jobOptions, gocron.JobOption(gocron.WithStartImmediately())) } - jobOptions = append(jobOptions, extraOptions...) + jobOptions = append(jobOptions, opts.ExtraOptions...) - _, err := s.scheduler.NewJob(def, gocron.NewTask(job), jobOptions...) + _, err := s.scheduler.NewJob(def, gocron.NewTask(jobFn), jobOptions...) if err != nil { return fmt.Errorf("failed to register job %q: %w", name, err) @@ -105,3 +127,9 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job return nil } + +func jobDefWithJitter(interval time.Duration) gocron.JobDefinition { + const jitter = 5 * time.Minute + + return gocron.DurationRandomJob(interval-jitter, interval+jitter) +} diff --git a/backend/internal/job/scim_job.go b/backend/internal/job/scim_job.go index 1ea8ee96..5c4336f6 100644 --- a/backend/internal/job/scim_job.go +++ b/backend/internal/job/scim_job.go @@ -16,8 +16,8 @@ type ScimJobs struct { func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.ScimService) error { jobs := &ScimJobs{scimService: scimService} - // Register the job to run every hour - return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true) + // Register the job to run every hour (with some jitter) + return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, service.RegisterJobOpts{RunImmediately: true}) } func (j *ScimJobs) SyncScim(ctx context.Context) error { diff --git a/backend/internal/service/api_key_service.go b/backend/internal/service/api_key_service.go index 3c29d6e9..cb409ec5 100644 --- a/backend/internal/service/api_key_service.go +++ b/backend/internal/service/api_key_service.go @@ -3,6 +3,7 @@ package service import ( "context" "errors" + "fmt" "time" datatype "github.com/pocket-id/pocket-id/backend/internal/model/types" @@ -205,36 +206,33 @@ func (s *ApiKeyService) ListExpiringApiKeys(ctx context.Context, daysAhead int) } func (s *ApiKeyService) SendApiKeyExpiringSoonEmail(ctx context.Context, apiKey model.ApiKey) error { - user := apiKey.User - - if user.ID == "" { - if err := s.db.WithContext(ctx).First(&user, "id = ?", apiKey.UserID).Error; err != nil { - return err - } - } - - if user.Email == nil { + if apiKey.User.Email == nil { return &common.UserEmailNotSetError{} } err := SendEmail(ctx, s.emailService, email.Address{ - Name: user.FullName(), - Email: *user.Email, + Name: apiKey.User.FullName(), + Email: *apiKey.User.Email, }, ApiKeyExpiringSoonTemplate, &ApiKeyExpiringSoonTemplateData{ ApiKeyName: apiKey.Name, ExpiresAt: apiKey.ExpiresAt.ToTime(), - Name: user.FirstName, + Name: apiKey.User.FirstName, }) if err != nil { - return err + return fmt.Errorf("error sending notification email: %w", err) } // Mark the API key as having had an expiration email sent - return s.db.WithContext(ctx). + err = s.db.WithContext(ctx). Model(&model.ApiKey{}). Where("id = ?", apiKey.ID). Update("expiration_email_sent", true). Error + if err != nil { + return fmt.Errorf("error recording expiration sent email in database: %w", err) + } + + return nil } func (s *ApiKeyService) initStaticApiKeyUser(ctx context.Context) (user model.User, err error) { diff --git a/backend/internal/service/app_lock_service.go b/backend/internal/service/app_lock_service.go index 339e41cd..0a309b15 100644 --- a/backend/internal/service/app_lock_service.go +++ b/backend/internal/service/app_lock_service.go @@ -73,7 +73,10 @@ func (lv *lockValue) Unmarshal(raw string) error { // Acquire obtains the lock. When force is true, the lock is stolen from any existing owner. // If the lock is forcefully acquired, it blocks until the previous lock has expired. func (s *AppLockService) Acquire(ctx context.Context, force bool) (waitUntil time.Time, err error) { - tx := s.db.Begin() + tx := s.db.WithContext(ctx).Begin() + if tx.Error != nil { + return time.Time{}, fmt.Errorf("begin lock transaction: %w", tx.Error) + } defer func() { tx.Rollback() }() @@ -174,7 +177,8 @@ func (s *AppLockService) RunRenewal(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - if err := s.renew(ctx); err != nil { + err := s.renew(ctx) + if err != nil { return fmt.Errorf("renew lock: %w", err) } } @@ -183,33 +187,43 @@ func (s *AppLockService) RunRenewal(ctx context.Context) error { // Release releases the lock if it is held by this process. func (s *AppLockService) Release(ctx context.Context) error { - opCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() + db, err := s.db.DB() + if err != nil { + return fmt.Errorf("failed to get DB connection: %w", err) + } var query string switch s.db.Name() { case "sqlite": query = ` - DELETE FROM kv - WHERE key = ? - AND json_extract(value, '$.lock_id') = ? - ` +DELETE FROM kv +WHERE key = ? + AND json_extract(value, '$.lock_id') = ? +` case "postgres": query = ` - DELETE FROM kv - WHERE key = $1 - AND value::json->>'lock_id' = $2 - ` +DELETE FROM kv +WHERE key = $1 + AND value::json->>'lock_id' = $2 +` default: return fmt.Errorf("unsupported database dialect: %s", s.db.Name()) } - res := s.db.WithContext(opCtx).Exec(query, lockKey, s.lockID) - if res.Error != nil { - return fmt.Errorf("release lock failed: %w", res.Error) + opCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + res, err := db.ExecContext(opCtx, query, lockKey, s.lockID) + if err != nil { + return fmt.Errorf("release lock failed: %w", err) } - if res.RowsAffected == 0 { + count, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("failed to count affected rows: %w", err) + } + + if count == 0 { slog.Warn("Application lock not held by this process, cannot release", slog.Int64("process_id", s.processID), slog.String("host_id", s.hostID), @@ -225,6 +239,11 @@ func (s *AppLockService) Release(ctx context.Context) error { // renew tries to renew the lock, retrying up to renewRetries times (sleeping 1s between attempts). func (s *AppLockService) renew(ctx context.Context) error { + db, err := s.db.DB() + if err != nil { + return fmt.Errorf("failed to get DB connection: %w", err) + } + var lastErr error for attempt := 1; attempt <= renewRetries; attempt++ { now := time.Now() @@ -246,42 +265,56 @@ func (s *AppLockService) renew(ctx context.Context) error { switch s.db.Name() { case "sqlite": query = ` - UPDATE kv - SET value = ? - WHERE key = ? - AND json_extract(value, '$.lock_id') = ? - AND json_extract(value, '$.expires_at') > ? - ` +UPDATE kv +SET value = ? +WHERE key = ? + AND json_extract(value, '$.lock_id') = ? + AND json_extract(value, '$.expires_at') > ? +` case "postgres": query = ` - UPDATE kv - SET value = $1 - WHERE key = $2 - AND value::json->>'lock_id' = $3 - AND ((value::json->>'expires_at')::bigint > $4) - ` +UPDATE kv +SET value = $1 +WHERE key = $2 + AND value::json->>'lock_id' = $3 + AND ((value::json->>'expires_at')::bigint > $4) +` default: return fmt.Errorf("unsupported database dialect: %s", s.db.Name()) } opCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - res := s.db.WithContext(opCtx).Exec(query, raw, lockKey, s.lockID, nowUnix) + res, err := db.ExecContext(opCtx, query, raw, lockKey, s.lockID, nowUnix) cancel() - switch { - case res.Error != nil: - lastErr = fmt.Errorf("lock renewal failed: %w", res.Error) - case res.RowsAffected == 0: - // Must be after checking res.Error - return ErrLockLost - default: + // Query succeeded, but may have updated 0 rows + if err == nil { + count, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("failed to count affected rows: %w", err) + } + + // If no rows were updated, we lost the lock + if count == 0 { + return ErrLockLost + } + + // All good slog.Debug("Renewed application lock", slog.Int64("process_id", s.processID), slog.String("host_id", s.hostID), + slog.Duration("duration", time.Since(now)), ) return nil } + // If we're here, we have an error that can be retried + slog.Debug("Application lock renewal attempt failed", + slog.Any("error", err), + slog.Duration("duration", time.Since(now)), + ) + lastErr = fmt.Errorf("lock renewal failed: %w", err) + // Wait before next attempt or cancel if context is done if attempt < renewRetries { select { diff --git a/backend/internal/service/app_lock_service_test.go b/backend/internal/service/app_lock_service_test.go index 95b22f51..8f829dff 100644 --- a/backend/internal/service/app_lock_service_test.go +++ b/backend/internal/service/app_lock_service_test.go @@ -49,6 +49,23 @@ func readLockValue(t *testing.T, db *gorm.DB) lockValue { return value } +func lockDatabaseForWrite(t *testing.T, db *gorm.DB) *gorm.DB { + t.Helper() + + tx := db.Begin() + require.NoError(t, tx.Error) + + // Keep a write transaction open to block other queries. + err := tx.Exec( + `INSERT INTO kv (key, value) VALUES (?, ?) ON CONFLICT(key) DO NOTHING`, + lockKey, + `{"expires_at":0}`, + ).Error + require.NoError(t, err) + + return tx +} + func TestAppLockServiceAcquire(t *testing.T) { t.Run("creates new lock when none exists", func(t *testing.T) { db := testutils.NewDatabaseForTest(t) @@ -99,6 +116,66 @@ func TestAppLockServiceAcquire(t *testing.T) { require.Equal(t, service.hostID, stored.HostID) require.Greater(t, stored.ExpiresAt, time.Now().Unix()) }) + + t.Run("force acquisition returns wait duration when stealing active lock", func(t *testing.T) { + db := testutils.NewDatabaseForTest(t) + service := newTestAppLockService(t, db) + + existing := lockValue{ + ProcessID: 99, + HostID: "other-host", + LockID: "other-lock-id", + ExpiresAt: time.Now().Add(ttl).Unix(), + } + insertLock(t, db, existing) + + waitUntil, err := service.Acquire(context.Background(), true) + require.NoError(t, err) + require.WithinDuration(t, time.Unix(existing.ExpiresAt, 0), waitUntil, time.Second) + }) + + t.Run("force acquisition does not wait when lock id is unchanged", func(t *testing.T) { + db := testutils.NewDatabaseForTest(t) + service := newTestAppLockService(t, db) + + insertLock(t, db, lockValue{ + ProcessID: 99, + HostID: "other-host", + LockID: service.lockID, + ExpiresAt: time.Now().Add(ttl).Unix(), + }) + + waitUntil, err := service.Acquire(context.Background(), true) + require.NoError(t, err) + require.True(t, waitUntil.IsZero()) + }) + + t.Run("returns error when existing lock value is invalid JSON", func(t *testing.T) { + db := testutils.NewDatabaseForTest(t) + service := newTestAppLockService(t, db) + + raw := "this-is-not-json" + err := db.Create(&model.KV{Key: lockKey, Value: &raw}).Error + require.NoError(t, err) + + _, err = service.Acquire(context.Background(), false) + require.ErrorContains(t, err, "decode existing lock value") + }) + + t.Run("returns context deadline exceeded when database is locked", func(t *testing.T) { + db := testutils.NewDatabaseForTest(t) + service := newTestAppLockService(t, db) + + tx := lockDatabaseForWrite(t, db) + defer tx.Rollback() + + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + + _, err := service.Acquire(ctx, false) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.ErrorContains(t, err, "begin lock transaction") + }) } func TestAppLockServiceRelease(t *testing.T) { @@ -134,6 +211,24 @@ func TestAppLockServiceRelease(t *testing.T) { stored := readLockValue(t, db) require.Equal(t, existing, stored) }) + + t.Run("returns context deadline exceeded when database is locked", func(t *testing.T) { + db := testutils.NewDatabaseForTest(t) + service := newTestAppLockService(t, db) + + _, err := service.Acquire(context.Background(), false) + require.NoError(t, err) + + tx := lockDatabaseForWrite(t, db) + defer tx.Rollback() + + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + + err = service.Release(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.ErrorContains(t, err, "release lock failed") + }) } func TestAppLockServiceRenew(t *testing.T) { @@ -186,4 +281,21 @@ func TestAppLockServiceRenew(t *testing.T) { err = service.renew(context.Background()) require.ErrorIs(t, err, ErrLockLost) }) + + t.Run("returns context deadline exceeded when database is locked", func(t *testing.T) { + db := testutils.NewDatabaseForTest(t) + service := newTestAppLockService(t, db) + + _, err := service.Acquire(context.Background(), false) + require.NoError(t, err) + + tx := lockDatabaseForWrite(t, db) + defer tx.Rollback() + + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + + err = service.renew(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + }) } diff --git a/backend/internal/service/email_service.go b/backend/internal/service/email_service.go index 05affa5b..7dffa08c 100644 --- a/backend/internal/service/email_service.go +++ b/backend/internal/service/email_service.go @@ -150,7 +150,8 @@ func SendEmail[V any](ctx context.Context, srv *EmailService, toEmail email.Addr } // Send the email - if err := srv.sendEmailContent(client, toEmail, c); err != nil { + err = srv.sendEmailContent(client, toEmail, c) + if err != nil { return fmt.Errorf("send email content: %w", err) } diff --git a/backend/internal/service/scheduler.go b/backend/internal/service/scheduler.go new file mode 100644 index 00000000..b53f195b --- /dev/null +++ b/backend/internal/service/scheduler.go @@ -0,0 +1,25 @@ +package service + +import ( + "context" + + backoff "github.com/cenkalti/backoff/v5" + "github.com/go-co-op/gocron/v2" +) + +// RegisterJobOpts holds optional configuration for registering a scheduled job. +type RegisterJobOpts struct { + // RunImmediately runs the job immediately after registration. + RunImmediately bool + // ExtraOptions are additional gocron job options. + ExtraOptions []gocron.JobOption + // BackOff is an optional backoff strategy. If non-nil, the job will be wrapped + // with automatic retry logic using the provided backoff on transient failures. + BackOff backoff.BackOff +} + +// Scheduler is an interface for registering and managing background jobs. +type Scheduler interface { + RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, opts RegisterJobOpts) error + RemoveJob(name string) error +} diff --git a/backend/internal/service/scim_service.go b/backend/internal/service/scim_service.go index 976e2cd5..f21f8be7 100644 --- a/backend/internal/service/scim_service.go +++ b/backend/internal/service/scim_service.go @@ -34,11 +34,6 @@ const scimErrorBodyLimit = 4096 type scimSyncAction int -type Scheduler interface { - RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error - RemoveJob(name string) error -} - const ( scimActionNone scimSyncAction = iota scimActionCreated @@ -149,7 +144,7 @@ func (s *ScimService) ScheduleSync() { err := s.scheduler.RegisterJob( context.Background(), jobName, - gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, false) + gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, RegisterJobOpts{}) if err != nil { slog.Error("Failed to schedule SCIM sync", slog.Any("error", err)) @@ -168,7 +163,8 @@ func (s *ScimService) SyncAll(ctx context.Context) error { errs = append(errs, ctx.Err()) break } - if err := s.SyncServiceProvider(ctx, provider.ID); err != nil { + err = s.SyncServiceProvider(ctx, provider.ID) + if err != nil { errs = append(errs, fmt.Errorf("failed to sync SCIM provider %s: %w", provider.ID, err)) } } @@ -210,26 +206,20 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID } var errs []error - var userStats scimSyncStats - var groupStats scimSyncStats // Sync users first, so that groups can reference them - if stats, err := s.syncUsers(ctx, provider, users, &userResources); err != nil { - errs = append(errs, err) - userStats = stats - } else { - userStats = stats - } - - stats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources) + userStats, err := s.syncUsers(ctx, provider, users, &userResources) + if err != nil { + errs = append(errs, err) + } + + groupStats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources) if err != nil { errs = append(errs, err) - groupStats = stats - } else { - groupStats = stats } if len(errs) > 0 { + err = errors.Join(errs...) slog.WarnContext(ctx, "SCIM sync completed with errors", slog.String("provider_id", provider.ID), slog.Int("error_count", len(errs)), @@ -240,12 +230,14 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID slog.Int("groups_updated", groupStats.Updated), slog.Int("groups_deleted", groupStats.Deleted), slog.Duration("duration", time.Since(start)), + slog.Any("error", err), ) - return errors.Join(errs...) + return err } provider.LastSyncedAt = new(datatype.DateTime(time.Now())) - if err := s.db.WithContext(ctx).Save(&provider).Error; err != nil { + err = s.db.WithContext(ctx).Save(&provider).Error + if err != nil { return err } @@ -273,7 +265,7 @@ func (s *ScimService) syncUsers( // Update or create users for _, u := range users { - existing := getResourceByExternalID[dto.ScimUser](u.ID, resourceList.Resources) + existing := getResourceByExternalID(u.ID, resourceList.Resources) action, created, err := s.syncUser(ctx, provider, u, existing) if created != nil && existing == nil { @@ -434,7 +426,7 @@ func (s *ScimService) syncGroup( // Prepare group members members := make([]dto.ScimGroupMember, len(group.Users)) for i, user := range group.Users { - userResource := getResourceByExternalID[dto.ScimUser](user.ID, userResources) + userResource := getResourceByExternalID(user.ID, userResources) if userResource == nil { // Groups depend on user IDs already being provisioned return scimActionNone, fmt.Errorf("cannot sync group %s: user %s is not provisioned in SCIM provider", group.ID, user.ID) diff --git a/backend/resources/email-templates/api-key-expiring-soon_html.tmpl b/backend/resources/email-templates/api-key-expiring-soon_html.tmpl index 8b52a5a0..b9b3bb5c 100644 --- a/backend/resources/email-templates/api-key-expiring-soon_html.tmpl +++ b/backend/resources/email-templates/api-key-expiring-soon_html.tmpl @@ -1 +1 @@ -{{define "root"}}
{{.AppName}}

{{.AppName}}

API Key Expiring Soon

Warning

Hello {{.Data.Name}},
This is a reminder that your API key {{.Data.APIKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}.

Please generate a new API key if you need continued access.

{{end}} \ No newline at end of file +{{define "root"}}
{{.AppName}}

{{.AppName}}

API Key Expiring Soon

Warning

Hello {{.Data.Name}},
This is a reminder that your API key {{.Data.ApiKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}.

Please generate a new API key if you need continued access.

{{end}} \ No newline at end of file diff --git a/backend/resources/email-templates/api-key-expiring-soon_text.tmpl b/backend/resources/email-templates/api-key-expiring-soon_text.tmpl index ae7ba74b..247969d5 100644 --- a/backend/resources/email-templates/api-key-expiring-soon_text.tmpl +++ b/backend/resources/email-templates/api-key-expiring-soon_text.tmpl @@ -6,6 +6,6 @@ API KEY EXPIRING SOON Warning Hello {{.Data.Name}}, -This is a reminder that your API key {{.Data.APIKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}. +This is a reminder that your API key {{.Data.ApiKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}. Please generate a new API key if you need continued access.{{end}} \ No newline at end of file diff --git a/backend/resources/migrations/postgres/20260304090200_indexes.down.sql b/backend/resources/migrations/postgres/20260304090200_indexes.down.sql new file mode 100644 index 00000000..f8e19576 --- /dev/null +++ b/backend/resources/migrations/postgres/20260304090200_indexes.down.sql @@ -0,0 +1 @@ +-- No-op \ No newline at end of file diff --git a/backend/resources/migrations/postgres/20260304090200_indexes.up.sql b/backend/resources/migrations/postgres/20260304090200_indexes.up.sql new file mode 100644 index 00000000..b288f044 --- /dev/null +++ b/backend/resources/migrations/postgres/20260304090200_indexes.up.sql @@ -0,0 +1,6 @@ +CREATE INDEX IF NOT EXISTS idx_webauthn_sessions_expires_at ON webauthn_sessions (expires_at); +CREATE INDEX IF NOT EXISTS idx_one_time_access_tokens_expires_at ON one_time_access_tokens (expires_at); +CREATE INDEX IF NOT EXISTS idx_oidc_authorization_codes_expires_at ON oidc_authorization_codes (expires_at); +CREATE INDEX IF NOT EXISTS idx_oidc_refresh_tokens_expires_at ON oidc_refresh_tokens (expires_at); +CREATE INDEX IF NOT EXISTS idx_reauthentication_tokens_expires_at ON reauthentication_tokens (expires_at); +CREATE INDEX IF NOT EXISTS idx_email_verification_tokens_expires_at ON email_verification_tokens (expires_at); \ No newline at end of file diff --git a/backend/resources/migrations/sqlite/20260304090200_indexes.down.sql b/backend/resources/migrations/sqlite/20260304090200_indexes.down.sql new file mode 100644 index 00000000..f8e19576 --- /dev/null +++ b/backend/resources/migrations/sqlite/20260304090200_indexes.down.sql @@ -0,0 +1 @@ +-- No-op \ No newline at end of file diff --git a/backend/resources/migrations/sqlite/20260304090200_indexes.up.sql b/backend/resources/migrations/sqlite/20260304090200_indexes.up.sql new file mode 100644 index 00000000..f8a32142 --- /dev/null +++ b/backend/resources/migrations/sqlite/20260304090200_indexes.up.sql @@ -0,0 +1,12 @@ +PRAGMA foreign_keys= OFF; +BEGIN; + +CREATE INDEX IF NOT EXISTS idx_webauthn_sessions_expires_at ON webauthn_sessions (expires_at); +CREATE INDEX IF NOT EXISTS idx_one_time_access_tokens_expires_at ON one_time_access_tokens (expires_at); +CREATE INDEX IF NOT EXISTS idx_oidc_authorization_codes_expires_at ON oidc_authorization_codes (expires_at); +CREATE INDEX IF NOT EXISTS idx_oidc_refresh_tokens_expires_at ON oidc_refresh_tokens (expires_at); +CREATE INDEX IF NOT EXISTS idx_reauthentication_tokens_expires_at ON reauthentication_tokens (expires_at); +CREATE INDEX IF NOT EXISTS idx_email_verification_tokens_expires_at ON email_verification_tokens (expires_at); + +COMMIT; +PRAGMA foreign_keys=ON; diff --git a/email-templates/emails/api-key-expiring-soon.tsx b/email-templates/emails/api-key-expiring-soon.tsx index 6ddcc327..2a33987d 100644 --- a/email-templates/emails/api-key-expiring-soon.tsx +++ b/email-templates/emails/api-key-expiring-soon.tsx @@ -40,7 +40,7 @@ ApiKeyExpiringEmail.TemplateProps = { ...sharedTemplateProps, data: { name: "{{.Data.Name}}", - apiKeyName: "{{.Data.APIKeyName}}", + apiKeyName: "{{.Data.ApiKeyName}}", expiresAt: '{{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}', }, };