mirror of
https://github.com/pocket-id/pocket-id.git
synced 2026-03-22 19:40:08 +00:00
Ensure cleanup jobs have some jitter
This commit is contained in:
@@ -7,7 +7,6 @@ import (
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/pocket-id/pocket-id/backend/internal/common"
|
||||
@@ -18,17 +17,15 @@ import (
|
||||
func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error {
|
||||
jobs := &DbCleanupJobs{db: db}
|
||||
|
||||
// Run every 24 hours (but with some jitter so they don't run at the exact same time), and now
|
||||
def := gocron.DurationRandomJob(24*time.Hour-2*time.Minute, 24*time.Hour+2*time.Minute)
|
||||
return errors.Join(
|
||||
s.RegisterJob(ctx, "ClearWebauthnSessions", def, jobs.clearWebauthnSessions, true),
|
||||
s.RegisterJob(ctx, "ClearOneTimeAccessTokens", def, jobs.clearOneTimeAccessTokens, true),
|
||||
s.RegisterJob(ctx, "ClearSignupTokens", def, jobs.clearSignupTokens, true),
|
||||
s.RegisterJob(ctx, "ClearEmailVerificationTokens", def, jobs.clearEmailVerificationTokens, true),
|
||||
s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", def, jobs.clearOidcAuthorizationCodes, true),
|
||||
s.RegisterJob(ctx, "ClearOidcRefreshTokens", def, jobs.clearOidcRefreshTokens, true),
|
||||
s.RegisterJob(ctx, "ClearReauthenticationTokens", def, jobs.clearReauthenticationTokens, true),
|
||||
s.RegisterJob(ctx, "ClearAuditLogs", def, jobs.clearAuditLogs, true),
|
||||
s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, true),
|
||||
s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, true),
|
||||
s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, true),
|
||||
s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, true),
|
||||
s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, true),
|
||||
s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, true),
|
||||
s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, true),
|
||||
s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, true),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -19,14 +19,19 @@ import (
|
||||
func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fileStorage storage.FileStorage) error {
|
||||
jobs := &FileCleanupJobs{db: db, fileStorage: fileStorage}
|
||||
|
||||
err := s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false)
|
||||
var errs []error
|
||||
errs = append(errs,
|
||||
s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false),
|
||||
)
|
||||
|
||||
// Only necessary for file system storage
|
||||
if fileStorage.Type() == storage.TypeFileSystem {
|
||||
err = errors.Join(err, s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true))
|
||||
errs = append(errs,
|
||||
s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true),
|
||||
)
|
||||
}
|
||||
|
||||
return err
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
type FileCleanupJobs struct {
|
||||
@@ -68,7 +73,8 @@ func (j *FileCleanupJobs) clearUnusedDefaultProfilePictures(ctx context.Context)
|
||||
// If these initials aren't used by any user, delete the file
|
||||
if _, ok := initialsInUse[initials]; !ok {
|
||||
filePath := path.Join(defaultPicturesDir, filename)
|
||||
if err := j.fileStorage.Delete(ctx, filePath); err != nil {
|
||||
err = j.fileStorage.Delete(ctx, filePath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Failed to delete unused default profile picture", slog.String("path", filePath), slog.Any("error", err))
|
||||
} else {
|
||||
filesDeleted++
|
||||
@@ -95,8 +101,9 @@ func (j *FileCleanupJobs) clearOrphanedTempFiles(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := j.fileStorage.Delete(ctx, p.Path); err != nil {
|
||||
slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", err))
|
||||
rErr := j.fileStorage.Delete(ctx, p.Path)
|
||||
if rErr != nil {
|
||||
slog.ErrorContext(ctx, "Failed to delete temp file", slog.String("path", p.Path), slog.Any("error", rErr))
|
||||
return nil
|
||||
}
|
||||
deleted++
|
||||
|
||||
@@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
|
||||
"github.com/pocket-id/pocket-id/backend/internal/service"
|
||||
)
|
||||
|
||||
@@ -17,8 +15,8 @@ type LdapJobs struct {
|
||||
func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.LdapService, appConfigService *service.AppConfigService) error {
|
||||
jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService}
|
||||
|
||||
// Register the job to run every hour
|
||||
return s.RegisterJob(ctx, "SyncLdap", gocron.DurationJob(time.Hour), jobs.syncLdap, true)
|
||||
// Register the job to run every hour (with some jitter)
|
||||
return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, true)
|
||||
}
|
||||
|
||||
func (j *LdapJobs) syncLdap(ctx context.Context) error {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/google/uuid"
|
||||
@@ -33,16 +34,12 @@ func (s *Scheduler) RemoveJob(name string) error {
|
||||
if job.Name() == name {
|
||||
err := s.scheduler.RemoveJob(job.ID())
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to unqueue job %q with ID %q: %w", name, job.ID().String(), err))
|
||||
errs = append(errs, fmt.Errorf("failed to dequeue job %q with ID %q: %w", name, job.ID().String(), err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Run the scheduler.
|
||||
@@ -105,3 +102,9 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func jobDefWithJitter(interval time.Duration) gocron.JobDefinition {
|
||||
const jitter = 5 * time.Minute
|
||||
|
||||
return gocron.DurationRandomJob(interval-jitter, interval+jitter)
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ type ScimJobs struct {
|
||||
func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.ScimService) error {
|
||||
jobs := &ScimJobs{scimService: scimService}
|
||||
|
||||
// Register the job to run every hour
|
||||
return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true)
|
||||
// Register the job to run every hour (with some jitter)
|
||||
return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(24*time.Hour), jobs.SyncScim, true)
|
||||
}
|
||||
|
||||
func (j *ScimJobs) SyncScim(ctx context.Context) error {
|
||||
|
||||
@@ -168,7 +168,8 @@ func (s *ScimService) SyncAll(ctx context.Context) error {
|
||||
errs = append(errs, ctx.Err())
|
||||
break
|
||||
}
|
||||
if err := s.SyncServiceProvider(ctx, provider.ID); err != nil {
|
||||
err = s.SyncServiceProvider(ctx, provider.ID)
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to sync SCIM provider %s: %w", provider.ID, err))
|
||||
}
|
||||
}
|
||||
@@ -210,26 +211,20 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID
|
||||
}
|
||||
|
||||
var errs []error
|
||||
var userStats scimSyncStats
|
||||
var groupStats scimSyncStats
|
||||
|
||||
// Sync users first, so that groups can reference them
|
||||
if stats, err := s.syncUsers(ctx, provider, users, &userResources); err != nil {
|
||||
errs = append(errs, err)
|
||||
userStats = stats
|
||||
} else {
|
||||
userStats = stats
|
||||
}
|
||||
|
||||
stats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources)
|
||||
userStats, err := s.syncUsers(ctx, provider, users, &userResources)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
groupStats, err := s.syncGroups(ctx, provider, groups, groupResources.Resources, userResources.Resources)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
groupStats = stats
|
||||
} else {
|
||||
groupStats = stats
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
err = errors.Join(errs...)
|
||||
slog.WarnContext(ctx, "SCIM sync completed with errors",
|
||||
slog.String("provider_id", provider.ID),
|
||||
slog.Int("error_count", len(errs)),
|
||||
@@ -240,12 +235,14 @@ func (s *ScimService) SyncServiceProvider(ctx context.Context, serviceProviderID
|
||||
slog.Int("groups_updated", groupStats.Updated),
|
||||
slog.Int("groups_deleted", groupStats.Deleted),
|
||||
slog.Duration("duration", time.Since(start)),
|
||||
slog.Any("error", err),
|
||||
)
|
||||
return errors.Join(errs...)
|
||||
return err
|
||||
}
|
||||
|
||||
provider.LastSyncedAt = new(datatype.DateTime(time.Now()))
|
||||
if err := s.db.WithContext(ctx).Save(&provider).Error; err != nil {
|
||||
err = s.db.WithContext(ctx).Save(&provider).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -273,7 +270,7 @@ func (s *ScimService) syncUsers(
|
||||
|
||||
// Update or create users
|
||||
for _, u := range users {
|
||||
existing := getResourceByExternalID[dto.ScimUser](u.ID, resourceList.Resources)
|
||||
existing := getResourceByExternalID(u.ID, resourceList.Resources)
|
||||
|
||||
action, created, err := s.syncUser(ctx, provider, u, existing)
|
||||
if created != nil && existing == nil {
|
||||
@@ -434,7 +431,7 @@ func (s *ScimService) syncGroup(
|
||||
// Prepare group members
|
||||
members := make([]dto.ScimGroupMember, len(group.Users))
|
||||
for i, user := range group.Users {
|
||||
userResource := getResourceByExternalID[dto.ScimUser](user.ID, userResources)
|
||||
userResource := getResourceByExternalID(user.ID, userResources)
|
||||
if userResource == nil {
|
||||
// Groups depend on user IDs already being provisioned
|
||||
return scimActionNone, fmt.Errorf("cannot sync group %s: user %s is not provisioned in SCIM provider", group.ID, user.ID)
|
||||
|
||||
Reference in New Issue
Block a user