diff --git a/backend/internal/job/analytics_job.go b/backend/internal/job/analytics_job.go
index f67e2042..c2e658df 100644
--- a/backend/internal/job/analytics_job.go
+++ b/backend/internal/job/analytics_job.go
@@ -28,7 +28,7 @@ func (s *Scheduler) RegisterAnalyticsJob(ctx context.Context, appConfig *service
appConfig: appConfig,
httpClient: httpClient,
}
- return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, true)
+ return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, service.RegisterJobOpts{RunImmediately: true})
}
type AnalyticsJob struct {
diff --git a/backend/internal/job/api_key_expiry_job.go b/backend/internal/job/api_key_expiry_job.go
index 6524089b..2481010e 100644
--- a/backend/internal/job/api_key_expiry_job.go
+++ b/backend/internal/job/api_key_expiry_job.go
@@ -22,7 +22,7 @@ func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService *
}
// Send every day at midnight
- return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, false)
+ return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, service.RegisterJobOpts{})
}
func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error {
@@ -42,7 +42,11 @@ func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) err
}
err = j.apiKeyService.SendApiKeyExpiringSoonEmail(ctx, key)
if err != nil {
- slog.ErrorContext(ctx, "Failed to send expiring API key notification email", slog.String("key", key.ID), slog.Any("error", err))
+ slog.ErrorContext(ctx, "Failed to send expiring API key notification email",
+ slog.String("key", key.ID),
+ slog.String("user", key.User.ID),
+ slog.Any("error", err),
+ )
}
}
return nil
diff --git a/backend/internal/job/db_cleanup_job.go b/backend/internal/job/db_cleanup_job.go
index fca96516..4872fae1 100644
--- a/backend/internal/job/db_cleanup_job.go
+++ b/backend/internal/job/db_cleanup_job.go
@@ -7,28 +7,37 @@ import (
"log/slog"
"time"
- "github.com/go-co-op/gocron/v2"
+ backoff "github.com/cenkalti/backoff/v5"
"gorm.io/gorm"
"github.com/pocket-id/pocket-id/backend/internal/common"
"github.com/pocket-id/pocket-id/backend/internal/model"
datatype "github.com/pocket-id/pocket-id/backend/internal/model/types"
+ "github.com/pocket-id/pocket-id/backend/internal/service"
)
func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error {
jobs := &DbCleanupJobs{db: db}
- // Run every 24 hours (but with some jitter so they don't run at the exact same time), and now
- def := gocron.DurationRandomJob(24*time.Hour-2*time.Minute, 24*time.Hour+2*time.Minute)
+ newBackOff := func() *backoff.ExponentialBackOff {
+ bo := backoff.NewExponentialBackOff()
+ bo.Multiplier = 4
+ bo.RandomizationFactor = 0.1
+ bo.InitialInterval = time.Second
+ bo.MaxInterval = 45 * time.Second
+ return bo
+ }
+
+ // Use exponential backoff for each DB cleanup job so transient query failures are retried automatically rather than causing an immediate job failure
return errors.Join(
- s.RegisterJob(ctx, "ClearWebauthnSessions", def, jobs.clearWebauthnSessions, true),
- s.RegisterJob(ctx, "ClearOneTimeAccessTokens", def, jobs.clearOneTimeAccessTokens, true),
- s.RegisterJob(ctx, "ClearSignupTokens", def, jobs.clearSignupTokens, true),
- s.RegisterJob(ctx, "ClearEmailVerificationTokens", def, jobs.clearEmailVerificationTokens, true),
- s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", def, jobs.clearOidcAuthorizationCodes, true),
- s.RegisterJob(ctx, "ClearOidcRefreshTokens", def, jobs.clearOidcRefreshTokens, true),
- s.RegisterJob(ctx, "ClearReauthenticationTokens", def, jobs.clearReauthenticationTokens, true),
- s.RegisterJob(ctx, "ClearAuditLogs", def, jobs.clearAuditLogs, true),
+ s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
+ s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, service.RegisterJobOpts{RunImmediately: true, BackOff: newBackOff()}),
)
}
diff --git a/backend/internal/job/file_cleanup_job.go b/backend/internal/job/file_cleanup_job.go
index 2b141dac..71f0dd34 100644
--- a/backend/internal/job/file_cleanup_job.go
+++ b/backend/internal/job/file_cleanup_job.go
@@ -13,20 +13,26 @@ import (
"gorm.io/gorm"
"github.com/pocket-id/pocket-id/backend/internal/model"
+ "github.com/pocket-id/pocket-id/backend/internal/service"
"github.com/pocket-id/pocket-id/backend/internal/storage"
)
func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fileStorage storage.FileStorage) error {
jobs := &FileCleanupJobs{db: db, fileStorage: fileStorage}
- err := s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false)
+ var errs []error
+ errs = append(errs,
+ s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, service.RegisterJobOpts{}),
+ )
// Only necessary for file system storage
if fileStorage.Type() == storage.TypeFileSystem {
- err = errors.Join(err, s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true))
+ errs = append(errs,
+ s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, service.RegisterJobOpts{RunImmediately: true}),
+ )
}
- return err
+ return errors.Join(errs...)
}
type FileCleanupJobs struct {
@@ -68,7 +74,8 @@ func (j *FileCleanupJobs) clearUnusedDefaultProfilePictures(ctx context.Context)
// If these initials aren't used by any user, delete the file
if _, ok := initialsInUse[initials]; !ok {
filePath := path.Join(defaultPicturesDir, filename)
- if err := j.fileStorage.Delete(ctx, filePath); err != nil {
+ err = j.fileStorage.Delete(ctx, filePath)
+ if err != nil {
slog.ErrorContext(ctx, "Failed to delete unused default profile picture", slog.String("path", filePath), slog.Any("error", err))
} else {
filesDeleted++
@@ -95,8 +102,9 @@ func (j *FileCleanupJobs) clearOrphanedTempFiles(ctx context.Context) error {
return nil
}
- if err := j.fileStorage.Delete(ctx, p.Path); err != nil {
- slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", err))
+ rErr := j.fileStorage.Delete(ctx, p.Path)
+ if rErr != nil {
+ slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", rErr))
return nil
}
deleted++
diff --git a/backend/internal/job/geoloite_update_job.go b/backend/internal/job/geoloite_update_job.go
index 65353757..7b163a8d 100644
--- a/backend/internal/job/geoloite_update_job.go
+++ b/backend/internal/job/geoloite_update_job.go
@@ -23,7 +23,7 @@ func (s *Scheduler) RegisterGeoLiteUpdateJobs(ctx context.Context, geoLiteServic
jobs := &GeoLiteUpdateJobs{geoLiteService: geoLiteService}
// Run every 24 hours (and right away)
- return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, true)
+ return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, service.RegisterJobOpts{RunImmediately: true})
}
func (j *GeoLiteUpdateJobs) updateGoeLiteDB(ctx context.Context) error {
diff --git a/backend/internal/job/ldap_job.go b/backend/internal/job/ldap_job.go
index 33646860..1547d954 100644
--- a/backend/internal/job/ldap_job.go
+++ b/backend/internal/job/ldap_job.go
@@ -4,8 +4,6 @@ import (
"context"
"time"
- "github.com/go-co-op/gocron/v2"
-
"github.com/pocket-id/pocket-id/backend/internal/service"
)
@@ -17,8 +15,8 @@ type LdapJobs struct {
func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.LdapService, appConfigService *service.AppConfigService) error {
jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService}
- // Register the job to run every hour
- return s.RegisterJob(ctx, "SyncLdap", gocron.DurationJob(time.Hour), jobs.syncLdap, true)
+ // Register the job to run every hour (with some jitter)
+ return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, service.RegisterJobOpts{RunImmediately: true})
}
func (j *LdapJobs) syncLdap(ctx context.Context) error {
diff --git a/backend/internal/job/scheduler.go b/backend/internal/job/scheduler.go
index 2a48c2a8..2ef2019b 100644
--- a/backend/internal/job/scheduler.go
+++ b/backend/internal/job/scheduler.go
@@ -5,9 +5,13 @@ import (
"errors"
"fmt"
"log/slog"
+ "time"
+ backoff "github.com/cenkalti/backoff/v5"
"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
+
+ "github.com/pocket-id/pocket-id/backend/internal/service"
)
type Scheduler struct {
@@ -33,16 +37,12 @@ func (s *Scheduler) RemoveJob(name string) error {
if job.Name() == name {
err := s.scheduler.RemoveJob(job.ID())
if err != nil {
- errs = append(errs, fmt.Errorf("failed to unqueue job %q with ID %q: %w", name, job.ID().String(), err))
+ errs = append(errs, fmt.Errorf("failed to dequeue job %q with ID %q: %w", name, job.ID().String(), err))
}
}
}
- if len(errs) > 0 {
- return errors.Join(errs...)
- }
-
- return nil
+ return errors.Join(errs...)
}
// Run the scheduler.
@@ -64,7 +64,29 @@ func (s *Scheduler) Run(ctx context.Context) error {
return nil
}
-func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error {
+func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, jobFn func(ctx context.Context) error, opts service.RegisterJobOpts) error {
+ // If a BackOff strategy is provided, wrap the job with retry logic
+ if opts.BackOff != nil {
+ origJob := jobFn
+ jobFn = func(ctx context.Context) error {
+ _, err := backoff.Retry(
+ ctx,
+ func() (struct{}, error) {
+ return struct{}{}, origJob(ctx)
+ },
+ backoff.WithBackOff(opts.BackOff),
+ backoff.WithNotify(func(err error, d time.Duration) {
+ slog.WarnContext(ctx, "Job failed, retrying",
+ slog.String("name", name),
+ slog.Any("error", err),
+ slog.Duration("retryIn", d),
+ )
+ }),
+ )
+ return err
+ }
+ }
+
jobOptions := []gocron.JobOption{
gocron.WithContext(ctx),
gocron.WithName(name),
@@ -91,13 +113,13 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job
),
}
- if runImmediately {
+ if opts.RunImmediately {
jobOptions = append(jobOptions, gocron.JobOption(gocron.WithStartImmediately()))
}
- jobOptions = append(jobOptions, extraOptions...)
+ jobOptions = append(jobOptions, opts.ExtraOptions...)
- _, err := s.scheduler.NewJob(def, gocron.NewTask(job), jobOptions...)
+ _, err := s.scheduler.NewJob(def, gocron.NewTask(jobFn), jobOptions...)
if err != nil {
return fmt.Errorf("failed to register job %q: %w", name, err)
@@ -105,3 +127,9 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job
return nil
}
+
+func jobDefWithJitter(interval time.Duration) gocron.JobDefinition {
+ const jitter = 5 * time.Minute
+
+ return gocron.DurationRandomJob(interval-jitter, interval+jitter)
+}
diff --git a/backend/internal/job/scim_job.go b/backend/internal/job/scim_job.go
index 1ea8ee96..5c4336f6 100644
--- a/backend/internal/job/scim_job.go
+++ b/backend/internal/job/scim_job.go
@@ -16,8 +16,8 @@ type ScimJobs struct {
func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.ScimService) error {
jobs := &ScimJobs{scimService: scimService}
- // Register the job to run every hour
- return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true)
+ // Register the job to run every hour (with some jitter)
+ return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, service.RegisterJobOpts{RunImmediately: true})
}
func (j *ScimJobs) SyncScim(ctx context.Context) error {
diff --git a/backend/internal/service/api_key_service.go b/backend/internal/service/api_key_service.go
index 3c29d6e9..cb409ec5 100644
--- a/backend/internal/service/api_key_service.go
+++ b/backend/internal/service/api_key_service.go
@@ -3,6 +3,7 @@ package service
import (
"context"
"errors"
+ "fmt"
"time"
datatype "github.com/pocket-id/pocket-id/backend/internal/model/types"
@@ -205,36 +206,33 @@ func (s *ApiKeyService) ListExpiringApiKeys(ctx context.Context, daysAhead int)
}
func (s *ApiKeyService) SendApiKeyExpiringSoonEmail(ctx context.Context, apiKey model.ApiKey) error {
- user := apiKey.User
-
- if user.ID == "" {
- if err := s.db.WithContext(ctx).First(&user, "id = ?", apiKey.UserID).Error; err != nil {
- return err
- }
- }
-
- if user.Email == nil {
+ if apiKey.User.Email == nil {
return &common.UserEmailNotSetError{}
}
err := SendEmail(ctx, s.emailService, email.Address{
- Name: user.FullName(),
- Email: *user.Email,
+ Name: apiKey.User.FullName(),
+ Email: *apiKey.User.Email,
}, ApiKeyExpiringSoonTemplate, &ApiKeyExpiringSoonTemplateData{
ApiKeyName: apiKey.Name,
ExpiresAt: apiKey.ExpiresAt.ToTime(),
- Name: user.FirstName,
+ Name: apiKey.User.FirstName,
})
if err != nil {
- return err
+ return fmt.Errorf("error sending notification email: %w", err)
}
// Mark the API key as having had an expiration email sent
- return s.db.WithContext(ctx).
+ err = s.db.WithContext(ctx).
Model(&model.ApiKey{}).
Where("id = ?", apiKey.ID).
Update("expiration_email_sent", true).
Error
+ if err != nil {
+ return fmt.Errorf("error recording expiration sent email in database: %w", err)
+ }
+
+ return nil
}
func (s *ApiKeyService) initStaticApiKeyUser(ctx context.Context) (user model.User, err error) {
diff --git a/backend/internal/service/app_lock_service.go b/backend/internal/service/app_lock_service.go
index 339e41cd..0a309b15 100644
--- a/backend/internal/service/app_lock_service.go
+++ b/backend/internal/service/app_lock_service.go
@@ -73,7 +73,10 @@ func (lv *lockValue) Unmarshal(raw string) error {
// Acquire obtains the lock. When force is true, the lock is stolen from any existing owner.
// If the lock is forcefully acquired, it blocks until the previous lock has expired.
func (s *AppLockService) Acquire(ctx context.Context, force bool) (waitUntil time.Time, err error) {
- tx := s.db.Begin()
+ tx := s.db.WithContext(ctx).Begin()
+ if tx.Error != nil {
+ return time.Time{}, fmt.Errorf("begin lock transaction: %w", tx.Error)
+ }
defer func() {
tx.Rollback()
}()
@@ -174,7 +177,8 @@ func (s *AppLockService) RunRenewal(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-ticker.C:
- if err := s.renew(ctx); err != nil {
+ err := s.renew(ctx)
+ if err != nil {
return fmt.Errorf("renew lock: %w", err)
}
}
@@ -183,33 +187,43 @@ func (s *AppLockService) RunRenewal(ctx context.Context) error {
// Release releases the lock if it is held by this process.
func (s *AppLockService) Release(ctx context.Context) error {
- opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
- defer cancel()
+ db, err := s.db.DB()
+ if err != nil {
+ return fmt.Errorf("failed to get DB connection: %w", err)
+ }
var query string
switch s.db.Name() {
case "sqlite":
query = `
- DELETE FROM kv
- WHERE key = ?
- AND json_extract(value, '$.lock_id') = ?
- `
+DELETE FROM kv
+WHERE key = ?
+ AND json_extract(value, '$.lock_id') = ?
+`
case "postgres":
query = `
- DELETE FROM kv
- WHERE key = $1
- AND value::json->>'lock_id' = $2
- `
+DELETE FROM kv
+WHERE key = $1
+ AND value::json->>'lock_id' = $2
+`
default:
return fmt.Errorf("unsupported database dialect: %s", s.db.Name())
}
- res := s.db.WithContext(opCtx).Exec(query, lockKey, s.lockID)
- if res.Error != nil {
- return fmt.Errorf("release lock failed: %w", res.Error)
+ opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
+ defer cancel()
+
+ res, err := db.ExecContext(opCtx, query, lockKey, s.lockID)
+ if err != nil {
+ return fmt.Errorf("release lock failed: %w", err)
}
- if res.RowsAffected == 0 {
+ count, err := res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("failed to count affected rows: %w", err)
+ }
+
+ if count == 0 {
slog.Warn("Application lock not held by this process, cannot release",
slog.Int64("process_id", s.processID),
slog.String("host_id", s.hostID),
@@ -225,6 +239,11 @@ func (s *AppLockService) Release(ctx context.Context) error {
// renew tries to renew the lock, retrying up to renewRetries times (sleeping 1s between attempts).
func (s *AppLockService) renew(ctx context.Context) error {
+ db, err := s.db.DB()
+ if err != nil {
+ return fmt.Errorf("failed to get DB connection: %w", err)
+ }
+
var lastErr error
for attempt := 1; attempt <= renewRetries; attempt++ {
now := time.Now()
@@ -246,42 +265,56 @@ func (s *AppLockService) renew(ctx context.Context) error {
switch s.db.Name() {
case "sqlite":
query = `
- UPDATE kv
- SET value = ?
- WHERE key = ?
- AND json_extract(value, '$.lock_id') = ?
- AND json_extract(value, '$.expires_at') > ?
- `
+UPDATE kv
+SET value = ?
+WHERE key = ?
+ AND json_extract(value, '$.lock_id') = ?
+ AND json_extract(value, '$.expires_at') > ?
+`
case "postgres":
query = `
- UPDATE kv
- SET value = $1
- WHERE key = $2
- AND value::json->>'lock_id' = $3
- AND ((value::json->>'expires_at')::bigint > $4)
- `
+UPDATE kv
+SET value = $1
+WHERE key = $2
+ AND value::json->>'lock_id' = $3
+ AND ((value::json->>'expires_at')::bigint > $4)
+`
default:
return fmt.Errorf("unsupported database dialect: %s", s.db.Name())
}
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
- res := s.db.WithContext(opCtx).Exec(query, raw, lockKey, s.lockID, nowUnix)
+ res, err := db.ExecContext(opCtx, query, raw, lockKey, s.lockID, nowUnix)
cancel()
- switch {
- case res.Error != nil:
- lastErr = fmt.Errorf("lock renewal failed: %w", res.Error)
- case res.RowsAffected == 0:
- // Must be after checking res.Error
- return ErrLockLost
- default:
+ // Query succeeded, but may have updated 0 rows
+ if err == nil {
+ count, err := res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("failed to count affected rows: %w", err)
+ }
+
+ // If no rows were updated, we lost the lock
+ if count == 0 {
+ return ErrLockLost
+ }
+
+ // All good
slog.Debug("Renewed application lock",
slog.Int64("process_id", s.processID),
slog.String("host_id", s.hostID),
+ slog.Duration("duration", time.Since(now)),
)
return nil
}
+ // If we're here, we have an error that can be retried
+ slog.Debug("Application lock renewal attempt failed",
+ slog.Any("error", err),
+ slog.Duration("duration", time.Since(now)),
+ )
+ lastErr = fmt.Errorf("lock renewal failed: %w", err)
+
// Wait before next attempt or cancel if context is done
if attempt < renewRetries {
select {
diff --git a/backend/internal/service/app_lock_service_test.go b/backend/internal/service/app_lock_service_test.go
index 95b22f51..8f829dff 100644
--- a/backend/internal/service/app_lock_service_test.go
+++ b/backend/internal/service/app_lock_service_test.go
@@ -49,6 +49,23 @@ func readLockValue(t *testing.T, db *gorm.DB) lockValue {
return value
}
+func lockDatabaseForWrite(t *testing.T, db *gorm.DB) *gorm.DB {
+ t.Helper()
+
+ tx := db.Begin()
+ require.NoError(t, tx.Error)
+
+ // Keep a write transaction open to block other queries.
+ err := tx.Exec(
+ `INSERT INTO kv (key, value) VALUES (?, ?) ON CONFLICT(key) DO NOTHING`,
+ lockKey,
+ `{"expires_at":0}`,
+ ).Error
+ require.NoError(t, err)
+
+ return tx
+}
+
func TestAppLockServiceAcquire(t *testing.T) {
t.Run("creates new lock when none exists", func(t *testing.T) {
db := testutils.NewDatabaseForTest(t)
@@ -99,6 +116,66 @@ func TestAppLockServiceAcquire(t *testing.T) {
require.Equal(t, service.hostID, stored.HostID)
require.Greater(t, stored.ExpiresAt, time.Now().Unix())
})
+
+ t.Run("force acquisition returns wait duration when stealing active lock", func(t *testing.T) {
+ db := testutils.NewDatabaseForTest(t)
+ service := newTestAppLockService(t, db)
+
+ existing := lockValue{
+ ProcessID: 99,
+ HostID: "other-host",
+ LockID: "other-lock-id",
+ ExpiresAt: time.Now().Add(ttl).Unix(),
+ }
+ insertLock(t, db, existing)
+
+ waitUntil, err := service.Acquire(context.Background(), true)
+ require.NoError(t, err)
+ require.WithinDuration(t, time.Unix(existing.ExpiresAt, 0), waitUntil, time.Second)
+ })
+
+ t.Run("force acquisition does not wait when lock id is unchanged", func(t *testing.T) {
+ db := testutils.NewDatabaseForTest(t)
+ service := newTestAppLockService(t, db)
+
+ insertLock(t, db, lockValue{
+ ProcessID: 99,
+ HostID: "other-host",
+ LockID: service.lockID,
+ ExpiresAt: time.Now().Add(ttl).Unix(),
+ })
+
+ waitUntil, err := service.Acquire(context.Background(), true)
+ require.NoError(t, err)
+ require.True(t, waitUntil.IsZero())
+ })
+
+ t.Run("returns error when existing lock value is invalid JSON", func(t *testing.T) {
+ db := testutils.NewDatabaseForTest(t)
+ service := newTestAppLockService(t, db)
+
+ raw := "this-is-not-json"
+ err := db.Create(&model.KV{Key: lockKey, Value: &raw}).Error
+ require.NoError(t, err)
+
+ _, err = service.Acquire(context.Background(), false)
+ require.ErrorContains(t, err, "decode existing lock value")
+ })
+
+ t.Run("returns context deadline exceeded when database is locked", func(t *testing.T) {
+ db := testutils.NewDatabaseForTest(t)
+ service := newTestAppLockService(t, db)
+
+ tx := lockDatabaseForWrite(t, db)
+ defer tx.Rollback()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
+ defer cancel()
+
+ _, err := service.Acquire(ctx, false)
+ require.ErrorIs(t, err, context.DeadlineExceeded)
+ require.ErrorContains(t, err, "begin lock transaction")
+ })
}
func TestAppLockServiceRelease(t *testing.T) {
@@ -134,6 +211,24 @@ func TestAppLockServiceRelease(t *testing.T) {
stored := readLockValue(t, db)
require.Equal(t, existing, stored)
})
+
+ t.Run("returns context deadline exceeded when database is locked", func(t *testing.T) {
+ db := testutils.NewDatabaseForTest(t)
+ service := newTestAppLockService(t, db)
+
+ _, err := service.Acquire(context.Background(), false)
+ require.NoError(t, err)
+
+ tx := lockDatabaseForWrite(t, db)
+ defer tx.Rollback()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
+ defer cancel()
+
+ err = service.Release(ctx)
+ require.ErrorIs(t, err, context.DeadlineExceeded)
+ require.ErrorContains(t, err, "release lock failed")
+ })
}
func TestAppLockServiceRenew(t *testing.T) {
@@ -186,4 +281,21 @@ func TestAppLockServiceRenew(t *testing.T) {
err = service.renew(context.Background())
require.ErrorIs(t, err, ErrLockLost)
})
+
+ t.Run("returns context deadline exceeded when database is locked", func(t *testing.T) {
+ db := testutils.NewDatabaseForTest(t)
+ service := newTestAppLockService(t, db)
+
+ _, err := service.Acquire(context.Background(), false)
+ require.NoError(t, err)
+
+ tx := lockDatabaseForWrite(t, db)
+ defer tx.Rollback()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
+ defer cancel()
+
+ err = service.renew(ctx)
+ require.ErrorIs(t, err, context.DeadlineExceeded)
+ })
}
diff --git a/backend/internal/service/email_service.go b/backend/internal/service/email_service.go
index 05affa5b..7dffa08c 100644
--- a/backend/internal/service/email_service.go
+++ b/backend/internal/service/email_service.go
@@ -150,7 +150,8 @@ func SendEmail[V any](ctx context.Context, srv *EmailService, toEmail email.Addr
}
// Send the email
- if err := srv.sendEmailContent(client, toEmail, c); err != nil {
+ err = srv.sendEmailContent(client, toEmail, c)
+ if err != nil {
return fmt.Errorf("send email content: %w", err)
}
diff --git a/backend/internal/service/scheduler.go b/backend/internal/service/scheduler.go
new file mode 100644
index 00000000..b53f195b
--- /dev/null
+++ b/backend/internal/service/scheduler.go
@@ -0,0 +1,25 @@
+package service
+
+import (
+ "context"
+
+ backoff "github.com/cenkalti/backoff/v5"
+ "github.com/go-co-op/gocron/v2"
+)
+
+// RegisterJobOpts holds optional configuration for registering a scheduled job.
+type RegisterJobOpts struct {
+ // RunImmediately runs the job immediately after registration.
+ RunImmediately bool
+ // ExtraOptions are additional gocron job options.
+ ExtraOptions []gocron.JobOption
+ // BackOff is an optional backoff strategy. If non-nil, the job will be wrapped
+ // with automatic retry logic using the provided backoff on transient failures.
+ BackOff backoff.BackOff
+}
+
+// Scheduler is an interface for registering and managing background jobs.
+type Scheduler interface {
+ RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, opts RegisterJobOpts) error
+ RemoveJob(name string) error
+}
diff --git a/backend/internal/service/scim_service.go b/backend/internal/service/scim_service.go
index 976e2cd5..f21f8be7 100644
--- a/backend/internal/service/scim_service.go
+++ b/backend/internal/service/scim_service.go
@@ -34,11 +34,6 @@ const scimErrorBodyLimit = 4096
type scimSyncAction int
-type Scheduler interface {
- RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error
- RemoveJob(name string) error
-}
-
const (
scimActionNone scimSyncAction = iota
scimActionCreated
@@ -149,7 +144,7 @@ func (s *ScimService) ScheduleSync() {
err := s.scheduler.RegisterJob(
context.Background(), jobName,
- gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, false)
+ gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, RegisterJobOpts{})
if err != nil {
slog.Error("Failed to schedule SCIM sync", slog.Any("error", err))
@@ -168,7 +163,8 @@ func (s *ScimService) SyncAll(ctx context.Context) error {
errs = append(errs, ctx.Err())
break
}
- if err := s.SyncServiceProvider(ctx, provider.ID); err != nil {
+ err = s.SyncServiceProvider(ctx, provider.ID)
+ if err != nil {
errs = append(errs, fmt.Errorf("failed to sync SCIM provider %s: %w", provider.ID, err))
}
}
@@ -210,26 +206,20 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID
}
var errs []error
- var userStats scimSyncStats
- var groupStats scimSyncStats
// Sync users first, so that groups can reference them
- if stats, err := s.syncUsers(ctx, provider, users, &userResources); err != nil {
- errs = append(errs, err)
- userStats = stats
- } else {
- userStats = stats
- }
-
- stats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources)
+ userStats, err := s.syncUsers(ctx, provider, users, &userResources)
+ if err != nil {
+ errs = append(errs, err)
+ }
+
+ groupStats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources)
if err != nil {
errs = append(errs, err)
- groupStats = stats
- } else {
- groupStats = stats
}
if len(errs) > 0 {
+ err = errors.Join(errs...)
slog.WarnContext(ctx, "SCIM sync completed with errors",
slog.String("provider_id", provider.ID),
slog.Int("error_count", len(errs)),
@@ -240,12 +230,14 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID
slog.Int("groups_updated", groupStats.Updated),
slog.Int("groups_deleted", groupStats.Deleted),
slog.Duration("duration", time.Since(start)),
+ slog.Any("error", err),
)
- return errors.Join(errs...)
+ return err
}
provider.LastSyncedAt = new(datatype.DateTime(time.Now()))
- if err := s.db.WithContext(ctx).Save(&provider).Error; err != nil {
+ err = s.db.WithContext(ctx).Save(&provider).Error
+ if err != nil {
return err
}
@@ -273,7 +265,7 @@ func (s *ScimService) syncUsers(
// Update or create users
for _, u := range users {
- existing := getResourceByExternalID[dto.ScimUser](u.ID, resourceList.Resources)
+ existing := getResourceByExternalID(u.ID, resourceList.Resources)
action, created, err := s.syncUser(ctx, provider, u, existing)
if created != nil && existing == nil {
@@ -434,7 +426,7 @@ func (s *ScimService) syncGroup(
// Prepare group members
members := make([]dto.ScimGroupMember, len(group.Users))
for i, user := range group.Users {
- userResource := getResourceByExternalID[dto.ScimUser](user.ID, userResources)
+ userResource := getResourceByExternalID(user.ID, userResources)
if userResource == nil {
// Groups depend on user IDs already being provisioned
return scimActionNone, fmt.Errorf("cannot sync group %s: user %s is not provisioned in SCIM provider", group.ID, user.ID)
diff --git a/backend/resources/email-templates/api-key-expiring-soon_html.tmpl b/backend/resources/email-templates/api-key-expiring-soon_html.tmpl
index 8b52a5a0..b9b3bb5c 100644
--- a/backend/resources/email-templates/api-key-expiring-soon_html.tmpl
+++ b/backend/resources/email-templates/api-key-expiring-soon_html.tmpl
@@ -1 +1 @@
-{{define "root"}}
{{.AppName}}
API Key Expiring Soon
Warning
Hello {{.Data.Name}}, This is a reminder that your API key {{.Data.APIKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}.
Please generate a new API key if you need continued access.
{{end}}
\ No newline at end of file
+{{define "root"}}
{{.AppName}}
API Key Expiring Soon
Warning
Hello {{.Data.Name}}, This is a reminder that your API key {{.Data.ApiKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}.
Please generate a new API key if you need continued access.
{{end}}
\ No newline at end of file
diff --git a/backend/resources/email-templates/api-key-expiring-soon_text.tmpl b/backend/resources/email-templates/api-key-expiring-soon_text.tmpl
index ae7ba74b..247969d5 100644
--- a/backend/resources/email-templates/api-key-expiring-soon_text.tmpl
+++ b/backend/resources/email-templates/api-key-expiring-soon_text.tmpl
@@ -6,6 +6,6 @@ API KEY EXPIRING SOON
Warning
Hello {{.Data.Name}},
-This is a reminder that your API key {{.Data.APIKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}.
+This is a reminder that your API key {{.Data.ApiKeyName}} will expire on {{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}.
Please generate a new API key if you need continued access.{{end}}
\ No newline at end of file
diff --git a/backend/resources/migrations/postgres/20260304090200_indexes.down.sql b/backend/resources/migrations/postgres/20260304090200_indexes.down.sql
new file mode 100644
index 00000000..f8e19576
--- /dev/null
+++ b/backend/resources/migrations/postgres/20260304090200_indexes.down.sql
@@ -0,0 +1 @@
+-- No-op
\ No newline at end of file
diff --git a/backend/resources/migrations/postgres/20260304090200_indexes.up.sql b/backend/resources/migrations/postgres/20260304090200_indexes.up.sql
new file mode 100644
index 00000000..b288f044
--- /dev/null
+++ b/backend/resources/migrations/postgres/20260304090200_indexes.up.sql
@@ -0,0 +1,6 @@
+CREATE INDEX IF NOT EXISTS idx_webauthn_sessions_expires_at ON webauthn_sessions (expires_at);
+CREATE INDEX IF NOT EXISTS idx_one_time_access_tokens_expires_at ON one_time_access_tokens (expires_at);
+CREATE INDEX IF NOT EXISTS idx_oidc_authorization_codes_expires_at ON oidc_authorization_codes (expires_at);
+CREATE INDEX IF NOT EXISTS idx_oidc_refresh_tokens_expires_at ON oidc_refresh_tokens (expires_at);
+CREATE INDEX IF NOT EXISTS idx_reauthentication_tokens_expires_at ON reauthentication_tokens (expires_at);
+CREATE INDEX IF NOT EXISTS idx_email_verification_tokens_expires_at ON email_verification_tokens (expires_at);
\ No newline at end of file
diff --git a/backend/resources/migrations/sqlite/20260304090200_indexes.down.sql b/backend/resources/migrations/sqlite/20260304090200_indexes.down.sql
new file mode 100644
index 00000000..f8e19576
--- /dev/null
+++ b/backend/resources/migrations/sqlite/20260304090200_indexes.down.sql
@@ -0,0 +1 @@
+-- No-op
\ No newline at end of file
diff --git a/backend/resources/migrations/sqlite/20260304090200_indexes.up.sql b/backend/resources/migrations/sqlite/20260304090200_indexes.up.sql
new file mode 100644
index 00000000..f8a32142
--- /dev/null
+++ b/backend/resources/migrations/sqlite/20260304090200_indexes.up.sql
@@ -0,0 +1,12 @@
+PRAGMA foreign_keys= OFF;
+BEGIN;
+
+CREATE INDEX IF NOT EXISTS idx_webauthn_sessions_expires_at ON webauthn_sessions (expires_at);
+CREATE INDEX IF NOT EXISTS idx_one_time_access_tokens_expires_at ON one_time_access_tokens (expires_at);
+CREATE INDEX IF NOT EXISTS idx_oidc_authorization_codes_expires_at ON oidc_authorization_codes (expires_at);
+CREATE INDEX IF NOT EXISTS idx_oidc_refresh_tokens_expires_at ON oidc_refresh_tokens (expires_at);
+CREATE INDEX IF NOT EXISTS idx_reauthentication_tokens_expires_at ON reauthentication_tokens (expires_at);
+CREATE INDEX IF NOT EXISTS idx_email_verification_tokens_expires_at ON email_verification_tokens (expires_at);
+
+COMMIT;
+PRAGMA foreign_keys=ON;
diff --git a/email-templates/emails/api-key-expiring-soon.tsx b/email-templates/emails/api-key-expiring-soon.tsx
index 6ddcc327..2a33987d 100644
--- a/email-templates/emails/api-key-expiring-soon.tsx
+++ b/email-templates/emails/api-key-expiring-soon.tsx
@@ -40,7 +40,7 @@ ApiKeyExpiringEmail.TemplateProps = {
...sharedTemplateProps,
data: {
name: "{{.Data.Name}}",
- apiKeyName: "{{.Data.APIKeyName}}",
+ apiKeyName: "{{.Data.ApiKeyName}}",
expiresAt: '{{.Data.ExpiresAt.Format "2006-01-02 15:04:05 MST"}}',
},
};