From 6615ad64a41e26bf190f8d885da5fbbaf08193a5 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Wed, 4 Mar 2026 20:08:57 -0800 Subject: [PATCH] Ensure cleanup jobs have some jitter --- backend/internal/job/db_cleanup_job.go | 19 ++++++-------- backend/internal/job/file_cleanup_job.go | 19 +++++++++----- backend/internal/job/ldap_job.go | 6 ++--- backend/internal/job/scheduler.go | 15 ++++++----- backend/internal/job/scim_job.go | 4 +-- backend/internal/service/scim_service.go | 33 +++++++++++------------- 6 files changed, 49 insertions(+), 47 deletions(-) diff --git a/backend/internal/job/db_cleanup_job.go b/backend/internal/job/db_cleanup_job.go index fca96516..e5c1e085 100644 --- a/backend/internal/job/db_cleanup_job.go +++ b/backend/internal/job/db_cleanup_job.go @@ -7,7 +7,6 @@ import ( "log/slog" "time" - "github.com/go-co-op/gocron/v2" "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/common" @@ -18,17 +17,15 @@ import ( func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error { jobs := &DbCleanupJobs{db: db} - // Run every 24 hours (but with some jitter so they don't run at the exact same time), and now - def := gocron.DurationRandomJob(24*time.Hour-2*time.Minute, 24*time.Hour+2*time.Minute) return errors.Join( - s.RegisterJob(ctx, "ClearWebauthnSessions", def, jobs.clearWebauthnSessions, true), - s.RegisterJob(ctx, "ClearOneTimeAccessTokens", def, jobs.clearOneTimeAccessTokens, true), - s.RegisterJob(ctx, "ClearSignupTokens", def, jobs.clearSignupTokens, true), - s.RegisterJob(ctx, "ClearEmailVerificationTokens", def, jobs.clearEmailVerificationTokens, true), - s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", def, jobs.clearOidcAuthorizationCodes, true), - s.RegisterJob(ctx, "ClearOidcRefreshTokens", def, jobs.clearOidcRefreshTokens, true), - s.RegisterJob(ctx, "ClearReauthenticationTokens", def, jobs.clearReauthenticationTokens, true), - s.RegisterJob(ctx, "ClearAuditLogs", def, jobs.clearAuditLogs, true), + s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, true), + s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, true), + s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, true), + s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, true), + s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, true), + s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, true), + s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, true), + s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, true), ) } diff --git a/backend/internal/job/file_cleanup_job.go b/backend/internal/job/file_cleanup_job.go index 2b141dac..2f70cb7c 100644 --- a/backend/internal/job/file_cleanup_job.go +++ b/backend/internal/job/file_cleanup_job.go @@ -19,14 +19,19 @@ import ( func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fileStorage storage.FileStorage) error { jobs := &FileCleanupJobs{db: db, fileStorage: fileStorage} - err := s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false) + var errs []error + errs = append(errs, + s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false), + ) // Only necessary for file system storage if fileStorage.Type() == storage.TypeFileSystem { - err = errors.Join(err, s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true)) + errs = append(errs, + s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true), + ) } - return err + return errors.Join(errs...) } type FileCleanupJobs struct { @@ -68,7 +73,8 @@ func (j *FileCleanupJobs) clearUnusedDefaultProfilePictures(ctx context.Context) // If these initials aren't used by any user, delete the file if _, ok := initialsInUse[initials]; !ok { filePath := path.Join(defaultPicturesDir, filename) - if err := j.fileStorage.Delete(ctx, filePath); err != nil { + err = j.fileStorage.Delete(ctx, filePath) + if err != nil { slog.ErrorContext(ctx, "Failed to delete unused default profile picture", slog.String("path", filePath), slog.Any("error", err)) } else { filesDeleted++ @@ -95,8 +101,9 @@ func (j *FileCleanupJobs) clearOrphanedTempFiles(ctx context.Context) error { return nil } - if err := j.fileStorage.Delete(ctx, p.Path); err != nil { - slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", err)) + rErr := j.fileStorage.Delete(ctx, p.Path) + if rErr != nil { + slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", rErr)) return nil } deleted++ diff --git a/backend/internal/job/ldap_job.go b/backend/internal/job/ldap_job.go index 33646860..4ba7728e 100644 --- a/backend/internal/job/ldap_job.go +++ b/backend/internal/job/ldap_job.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/go-co-op/gocron/v2" - "github.com/pocket-id/pocket-id/backend/internal/service" ) @@ -17,8 +15,8 @@ type LdapJobs struct { func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.LdapService, appConfigService *service.AppConfigService) error { jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService} - // Register the job to run every hour - return s.RegisterJob(ctx, "SyncLdap", gocron.DurationJob(time.Hour), jobs.syncLdap, true) + // Register the job to run every hour (with some jitter) + return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, true) } func (j *LdapJobs) syncLdap(ctx context.Context) error { diff --git a/backend/internal/job/scheduler.go b/backend/internal/job/scheduler.go index 2a48c2a8..85b7e1ff 100644 --- a/backend/internal/job/scheduler.go +++ b/backend/internal/job/scheduler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "time" "github.com/go-co-op/gocron/v2" "github.com/google/uuid" @@ -33,16 +34,12 @@ func (s *Scheduler) RemoveJob(name string) error { if job.Name() == name { err := s.scheduler.RemoveJob(job.ID()) if err != nil { - errs = append(errs, fmt.Errorf("failed to unqueue job %q with ID %q: %w", name, job.ID().String(), err)) + errs = append(errs, fmt.Errorf("failed to dequeue job %q with ID %q: %w", name, job.ID().String(), err)) } } } - if len(errs) > 0 { - return errors.Join(errs...) - } - - return nil + return errors.Join(errs...) } // Run the scheduler. @@ -105,3 +102,9 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job return nil } + +func jobDefWithJitter(interval time.Duration) gocron.JobDefinition { + const jitter = 5 * time.Minute + + return gocron.DurationRandomJob(interval-jitter, interval+jitter) +} diff --git a/backend/internal/job/scim_job.go b/backend/internal/job/scim_job.go index 1ea8ee96..29762f48 100644 --- a/backend/internal/job/scim_job.go +++ b/backend/internal/job/scim_job.go @@ -16,8 +16,8 @@ type ScimJobs struct { func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.ScimService) error { jobs := &ScimJobs{scimService: scimService} - // Register the job to run every hour - return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true) + // Register the job to run every hour (with some jitter) + return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(24*time.Hour), jobs.SyncScim, true) } func (j *ScimJobs) SyncScim(ctx context.Context) error { diff --git a/backend/internal/service/scim_service.go b/backend/internal/service/scim_service.go index 976e2cd5..c8fc4339 100644 --- a/backend/internal/service/scim_service.go +++ b/backend/internal/service/scim_service.go @@ -168,7 +168,8 @@ func (s *ScimService) SyncAll(ctx context.Context) error { errs = append(errs, ctx.Err()) break } - if err := s.SyncServiceProvider(ctx, provider.ID); err != nil { + err = s.SyncServiceProvider(ctx, provider.ID) + if err != nil { errs = append(errs, fmt.Errorf("failed to sync SCIM provider %s: %w", provider.ID, err)) } } @@ -210,26 +211,20 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID } var errs []error - var userStats scimSyncStats - var groupStats scimSyncStats // Sync users first, so that groups can reference them - if stats, err := s.syncUsers(ctx, provider, users, &userResources); err != nil { - errs = append(errs, err) - userStats = stats - } else { - userStats = stats - } - - stats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources) + userStats, err := s.syncUsers(ctx, provider, users, &userResources) + if err != nil { + errs = append(errs, err) + } + + groupStats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources) if err != nil { errs = append(errs, err) - groupStats = stats - } else { - groupStats = stats } if len(errs) > 0 { + err = errors.Join(errs...) slog.WarnContext(ctx, "SCIM sync completed with errors", slog.String("provider_id", provider.ID), slog.Int("error_count", len(errs)), @@ -240,12 +235,14 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID slog.Int("groups_updated", groupStats.Updated), slog.Int("groups_deleted", groupStats.Deleted), slog.Duration("duration", time.Since(start)), + slog.Any("error", err), ) - return errors.Join(errs...) + return err } provider.LastSyncedAt = new(datatype.DateTime(time.Now())) - if err := s.db.WithContext(ctx).Save(&provider).Error; err != nil { + err = s.db.WithContext(ctx).Save(&provider).Error + if err != nil { return err } @@ -273,7 +270,7 @@ func (s *ScimService) syncUsers( // Update or create users for _, u := range users { - existing := getResourceByExternalID[dto.ScimUser](u.ID, resourceList.Resources) + existing := getResourceByExternalID(u.ID, resourceList.Resources) action, created, err := s.syncUser(ctx, provider, u, existing) if created != nil && existing == nil { @@ -434,7 +431,7 @@ func (s *ScimService) syncGroup( // Prepare group members members := make([]dto.ScimGroupMember, len(group.Users)) for i, user := range group.Users { - userResource := getResourceByExternalID[dto.ScimUser](user.ID, userResources) + userResource := getResourceByExternalID(user.ID, userResources) if userResource == nil { // Groups depend on user IDs already being provisioned return scimActionNone, fmt.Errorf("cannot sync group %s: user %s is not provisioned in SCIM provider", group.ID, user.ID)