From 6d6dc6646a39921a604b6c825d3e7e76af6c693b Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Thu, 29 May 2025 08:15:35 -0700 Subject: [PATCH] fix: run jobs at interval instead of specific time (#585) --- backend/internal/job/analytics_job.go | 4 +- backend/internal/job/api_key_expiry_job.go | 16 +++-- backend/internal/job/db_cleanup_job.go | 75 +++++++++++++++------ backend/internal/job/file_cleanup_job.go | 11 +-- backend/internal/job/geoloite_update_job.go | 7 +- backend/internal/job/ldap_job.go | 5 +- backend/internal/job/scheduler.go | 33 +++++---- 7 files changed, 105 insertions(+), 46 deletions(-) diff --git a/backend/internal/job/analytics_job.go b/backend/internal/job/analytics_job.go index 53895cae..468d45f0 100644 --- a/backend/internal/job/analytics_job.go +++ b/backend/internal/job/analytics_job.go @@ -9,6 +9,7 @@ import ( "time" backoff "github.com/cenkalti/backoff/v5" + "github.com/go-co-op/gocron/v2" "github.com/pocket-id/pocket-id/backend/internal/common" "github.com/pocket-id/pocket-id/backend/internal/service" @@ -22,11 +23,12 @@ func (s *Scheduler) RegisterAnalyticsJob(ctx context.Context, appConfig *service return nil } + // Send every 24 hours jobs := &AnalyticsJob{ appConfig: appConfig, httpClient: httpClient, } - return s.registerJob(ctx, "SendHeartbeat", "0 0 * * *", jobs.sendHeartbeat, true) + return s.registerJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, true) } type AnalyticsJob struct { diff --git a/backend/internal/job/api_key_expiry_job.go b/backend/internal/job/api_key_expiry_job.go index 6793cf00..3bd82d11 100644 --- a/backend/internal/job/api_key_expiry_job.go +++ b/backend/internal/job/api_key_expiry_job.go @@ -2,7 +2,10 @@ package job import ( "context" - "log" + "fmt" + "log/slog" + + "github.com/go-co-op/gocron/v2" "github.com/pocket-id/pocket-id/backend/internal/service" ) @@ -18,7 +21,8 @@ func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService * appConfigService: appConfigService, } - return s.registerJob(ctx, "ExpiredApiKeyEmailJob", "0 0 * * *", jobs.checkAndNotifyExpiringApiKeys, false) + // Send every day at midnight + return s.registerJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, false) } func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error { @@ -29,16 +33,16 @@ func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) err apiKeys, err := j.apiKeyService.ListExpiringApiKeys(ctx, 7) if err != nil { - log.Printf("Failed to list expiring API keys: %v", err) - return err + return fmt.Errorf("failed to list expiring API keys: %w", err) } for _, key := range apiKeys { if key.User.Email == "" { continue } - if err := j.apiKeyService.SendApiKeyExpiringSoonEmail(ctx, key); err != nil { - log.Printf("Failed to send email for key %s: %v", key.ID, err) + err = j.apiKeyService.SendApiKeyExpiringSoonEmail(ctx, key) + if err != nil { + slog.ErrorContext(ctx, "Failed to send expiring API key notification email", slog.String("key", key.ID), slog.Any("error", err)) } } return nil diff --git a/backend/internal/job/db_cleanup_job.go b/backend/internal/job/db_cleanup_job.go index 45b44044..b7e9737d 100644 --- a/backend/internal/job/db_cleanup_job.go +++ b/backend/internal/job/db_cleanup_job.go @@ -3,8 +3,11 @@ package job import ( "context" "errors" + "fmt" + "log/slog" "time" + "github.com/go-co-op/gocron/v2" "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/model" @@ -14,12 +17,14 @@ import ( func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error { jobs := &DbCleanupJobs{db: db} + // Run every 24 hours (but with some jitter so they don't run at the exact same time), and now + def := gocron.DurationRandomJob(24*time.Hour-2*time.Minute, 24*time.Hour+2*time.Minute) return errors.Join( - s.registerJob(ctx, "ClearWebauthnSessions", "0 3 * * *", jobs.clearWebauthnSessions, false), - s.registerJob(ctx, "ClearOneTimeAccessTokens", "0 3 * * *", jobs.clearOneTimeAccessTokens, false), - s.registerJob(ctx, "ClearOidcAuthorizationCodes", "0 3 * * *", jobs.clearOidcAuthorizationCodes, false), - s.registerJob(ctx, "ClearOidcRefreshTokens", "0 3 * * *", jobs.clearOidcRefreshTokens, false), - s.registerJob(ctx, "ClearAuditLogs", "0 3 * * *", jobs.clearAuditLogs, false), + s.registerJob(ctx, "ClearWebauthnSessions", def, jobs.clearWebauthnSessions, true), + s.registerJob(ctx, "ClearOneTimeAccessTokens", def, jobs.clearOneTimeAccessTokens, true), + s.registerJob(ctx, "ClearOidcAuthorizationCodes", def, jobs.clearOidcAuthorizationCodes, true), + s.registerJob(ctx, "ClearOidcRefreshTokens", def, jobs.clearOidcRefreshTokens, true), + s.registerJob(ctx, "ClearAuditLogs", def, jobs.clearAuditLogs, true), ) } @@ -29,40 +34,70 @@ type DbCleanupJobs struct { // ClearWebauthnSessions deletes WebAuthn sessions that have expired func (j *DbCleanupJobs) clearWebauthnSessions(ctx context.Context) error { - return j.db. + st := j.db. WithContext(ctx). - Delete(&model.WebauthnSession{}, "expires_at < ?", datatype.DateTime(time.Now())). - Error + Delete(&model.WebauthnSession{}, "expires_at < ?", datatype.DateTime(time.Now())) + if st.Error != nil { + return fmt.Errorf("failed to clean expired WebAuthn sessions: %w", st.Error) + } + + slog.InfoContext(ctx, "Cleaned expired WebAuthn sessions", slog.Int64("count", st.RowsAffected)) + + return nil } // ClearOneTimeAccessTokens deletes one-time access tokens that have expired func (j *DbCleanupJobs) clearOneTimeAccessTokens(ctx context.Context) error { - return j.db. + st := j.db. WithContext(ctx). - Delete(&model.OneTimeAccessToken{}, "expires_at < ?", datatype.DateTime(time.Now())). - Error + Delete(&model.OneTimeAccessToken{}, "expires_at < ?", datatype.DateTime(time.Now())) + if st.Error != nil { + return fmt.Errorf("failed to clean expired one-time access tokens: %w", st.Error) + } + + slog.InfoContext(ctx, "Cleaned expired one-time access tokens", slog.Int64("count", st.RowsAffected)) + + return nil } // ClearOidcAuthorizationCodes deletes OIDC authorization codes that have expired func (j *DbCleanupJobs) clearOidcAuthorizationCodes(ctx context.Context) error { - return j.db. + st := j.db. WithContext(ctx). - Delete(&model.OidcAuthorizationCode{}, "expires_at < ?", datatype.DateTime(time.Now())). - Error + Delete(&model.OidcAuthorizationCode{}, "expires_at < ?", datatype.DateTime(time.Now())) + if st.Error != nil { + return fmt.Errorf("failed to clean expired OIDC authorization codes: %w", st.Error) + } + + slog.InfoContext(ctx, "Cleaned expired OIDC authorization codes", slog.Int64("count", st.RowsAffected)) + + return nil } // ClearOidcAuthorizationCodes deletes OIDC authorization codes that have expired func (j *DbCleanupJobs) clearOidcRefreshTokens(ctx context.Context) error { - return j.db. + st := j.db. WithContext(ctx). - Delete(&model.OidcRefreshToken{}, "expires_at < ?", datatype.DateTime(time.Now())). - Error + Delete(&model.OidcRefreshToken{}, "expires_at < ?", datatype.DateTime(time.Now())) + if st.Error != nil { + return fmt.Errorf("failed to clean expired OIDC refresh tokens: %w", st.Error) + } + + slog.InfoContext(ctx, "Cleaned expired OIDC refresh tokens", slog.Int64("count", st.RowsAffected)) + + return nil } // ClearAuditLogs deletes audit logs older than 90 days func (j *DbCleanupJobs) clearAuditLogs(ctx context.Context) error { - return j.db. + st := j.db. WithContext(ctx). - Delete(&model.AuditLog{}, "created_at < ?", datatype.DateTime(time.Now().AddDate(0, 0, -90))). - Error + Delete(&model.AuditLog{}, "created_at < ?", datatype.DateTime(time.Now().AddDate(0, 0, -90))) + if st.Error != nil { + return fmt.Errorf("failed to delete old audit logs: %w", st.Error) + } + + slog.InfoContext(ctx, "Deleted old audit logs", slog.Int64("count", st.RowsAffected)) + + return nil } diff --git a/backend/internal/job/file_cleanup_job.go b/backend/internal/job/file_cleanup_job.go index 427f511a..29598156 100644 --- a/backend/internal/job/file_cleanup_job.go +++ b/backend/internal/job/file_cleanup_job.go @@ -3,11 +3,13 @@ package job import ( "context" "fmt" - "log" + "log/slog" "os" "path/filepath" "strings" + "time" + "github.com/go-co-op/gocron/v2" "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/common" @@ -17,7 +19,8 @@ import ( func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB) error { jobs := &FileCleanupJobs{db: db} - return s.registerJob(ctx, "ClearUnusedDefaultProfilePictures", "0 2 * * 0", jobs.clearUnusedDefaultProfilePictures, false) + // Run every 24 hours + return s.registerJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false) } type FileCleanupJobs struct { @@ -64,13 +67,13 @@ func (j *FileCleanupJobs) clearUnusedDefaultProfilePictures(ctx context.Context) if _, ok := initialsInUse[initials]; !ok { filePath := filepath.Join(defaultPicturesDir, filename) if err := os.Remove(filePath); err != nil { - log.Printf("Failed to delete unused default profile picture %s: %v", filePath, err) + slog.ErrorContext(ctx, "Failed to delete unused default profile picture", slog.String("path", filePath), slog.Any("error", err)) } else { filesDeleted++ } } } - log.Printf("Deleted %d unused default profile pictures", filesDeleted) + slog.Info("Done deleting unused default profile pictures", slog.Int("count", filesDeleted)) return nil } diff --git a/backend/internal/job/geoloite_update_job.go b/backend/internal/job/geoloite_update_job.go index 46001274..59419c3c 100644 --- a/backend/internal/job/geoloite_update_job.go +++ b/backend/internal/job/geoloite_update_job.go @@ -2,6 +2,9 @@ package job import ( "context" + "time" + + "github.com/go-co-op/gocron/v2" "github.com/pocket-id/pocket-id/backend/internal/service" ) @@ -19,8 +22,8 @@ func (s *Scheduler) RegisterGeoLiteUpdateJobs(ctx context.Context, geoLiteServic jobs := &GeoLiteUpdateJobs{geoLiteService: geoLiteService} - // Register the job to run every day, at 5 minutes past midnight - return s.registerJob(ctx, "UpdateGeoLiteDB", "5 * */1 * *", jobs.updateGoeLiteDB, true) + // Run every 24 hours (and right away) + return s.registerJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, true) } func (j *GeoLiteUpdateJobs) updateGoeLiteDB(ctx context.Context) error { diff --git a/backend/internal/job/ldap_job.go b/backend/internal/job/ldap_job.go index 95859bd1..ecb93212 100644 --- a/backend/internal/job/ldap_job.go +++ b/backend/internal/job/ldap_job.go @@ -2,6 +2,9 @@ package job import ( "context" + "time" + + "github.com/go-co-op/gocron/v2" "github.com/pocket-id/pocket-id/backend/internal/service" ) @@ -15,7 +18,7 @@ func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.L jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService} // Register the job to run every hour - return s.registerJob(ctx, "SyncLdap", "0 * * * *", jobs.syncLdap, true) + return s.registerJob(ctx, "SyncLdap", gocron.DurationJob(time.Hour), jobs.syncLdap, true) } func (j *LdapJobs) syncLdap(ctx context.Context) error { diff --git a/backend/internal/job/scheduler.go b/backend/internal/job/scheduler.go index 4d71f8d6..20461702 100644 --- a/backend/internal/job/scheduler.go +++ b/backend/internal/job/scheduler.go @@ -3,7 +3,7 @@ package job import ( "context" "fmt" - "log" + "log/slog" "github.com/go-co-op/gocron/v2" "github.com/google/uuid" @@ -27,7 +27,7 @@ func NewScheduler() (*Scheduler, error) { // Run the scheduler. // This function blocks until the context is canceled. func (s *Scheduler) Run(ctx context.Context) error { - log.Println("Starting job scheduler") + slog.Info("Starting job scheduler") s.scheduler.Start() // Block until context is canceled @@ -35,23 +35,36 @@ func (s *Scheduler) Run(ctx context.Context) error { err := s.scheduler.Shutdown() if err != nil { - log.Printf("[WARN] Error shutting down job scheduler: %v", err) + slog.Error("Error shutting down job scheduler", slog.Any("error", err)) } else { - log.Println("Job scheduler shut down") + slog.Info("Job scheduler shut down") } return nil } -func (s *Scheduler) registerJob(ctx context.Context, name string, interval string, job func(ctx context.Context) error, runImmediately bool) error { +func (s *Scheduler) registerJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool) error { jobOptions := []gocron.JobOption{ gocron.WithContext(ctx), gocron.WithEventListeners( + gocron.BeforeJobRuns(func(jobID uuid.UUID, jobName string) { + slog.Info("Starting job", + slog.String("name", name), + slog.String("id", jobID.String()), + ) + }), gocron.AfterJobRuns(func(jobID uuid.UUID, jobName string) { - log.Printf("Job %q run successfully", name) + slog.Info("Job run successfully", + slog.String("name", name), + slog.String("id", jobID.String()), + ) }), gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) { - log.Printf("Job %q failed with error: %v", name, err) + slog.Error("Job failed with error", + slog.String("name", name), + slog.String("id", jobID.String()), + slog.Any("error", err), + ) }), ), } @@ -60,11 +73,7 @@ func (s *Scheduler) registerJob(ctx context.Context, name string, interval strin jobOptions = append(jobOptions, gocron.JobOption(gocron.WithStartImmediately())) } - _, err := s.scheduler.NewJob( - gocron.CronJob(interval, false), - gocron.NewTask(job), - jobOptions..., - ) + _, err := s.scheduler.NewJob(def, gocron.NewTask(job), jobOptions...) if err != nil { return fmt.Errorf("failed to register job %q: %w", name, err)