mirror of
https://github.com/pocket-id/pocket-id.git
synced 2026-03-22 18:30:09 +00:00
Improvements to AppLockService
This commit is contained in:
@@ -174,7 +174,8 @@ func (s *AppLockService) RunRenewal(ctx context.Context) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := s.renew(ctx); err != nil {
|
err := s.renew(ctx)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("renew lock: %w", err)
|
return fmt.Errorf("renew lock: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -183,33 +184,43 @@ func (s *AppLockService) RunRenewal(ctx context.Context) error {
|
|||||||
|
|
||||||
// Release releases the lock if it is held by this process.
|
// Release releases the lock if it is held by this process.
|
||||||
func (s *AppLockService) Release(ctx context.Context) error {
|
func (s *AppLockService) Release(ctx context.Context) error {
|
||||||
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
db, err := s.db.DB()
|
||||||
defer cancel()
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get DB connection: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
var query string
|
var query string
|
||||||
switch s.db.Name() {
|
switch s.db.Name() {
|
||||||
case "sqlite":
|
case "sqlite":
|
||||||
query = `
|
query = `
|
||||||
DELETE FROM kv
|
DELETE FROM kv
|
||||||
WHERE key = ?
|
WHERE key = ?
|
||||||
AND json_extract(value, '$.lock_id') = ?
|
AND json_extract(value, '$.lock_id') = ?
|
||||||
`
|
`
|
||||||
case "postgres":
|
case "postgres":
|
||||||
query = `
|
query = `
|
||||||
DELETE FROM kv
|
DELETE FROM kv
|
||||||
WHERE key = $1
|
WHERE key = $1
|
||||||
AND value::json->>'lock_id' = $2
|
AND value::json->>'lock_id' = $2
|
||||||
`
|
`
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unsupported database dialect: %s", s.db.Name())
|
return fmt.Errorf("unsupported database dialect: %s", s.db.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
res := s.db.WithContext(opCtx).Exec(query, lockKey, s.lockID)
|
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||||
if res.Error != nil {
|
defer cancel()
|
||||||
return fmt.Errorf("release lock failed: %w", res.Error)
|
|
||||||
|
res, err := db.ExecContext(opCtx, query, lockKey, s.lockID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("release lock failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.RowsAffected == 0 {
|
count, err := res.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to count affected rows: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count == 0 {
|
||||||
slog.Warn("Application lock not held by this process, cannot release",
|
slog.Warn("Application lock not held by this process, cannot release",
|
||||||
slog.Int64("process_id", s.processID),
|
slog.Int64("process_id", s.processID),
|
||||||
slog.String("host_id", s.hostID),
|
slog.String("host_id", s.hostID),
|
||||||
@@ -225,6 +236,11 @@ func (s *AppLockService) Release(ctx context.Context) error {
|
|||||||
|
|
||||||
// renew tries to renew the lock, retrying up to renewRetries times (sleeping 1s between attempts).
|
// renew tries to renew the lock, retrying up to renewRetries times (sleeping 1s between attempts).
|
||||||
func (s *AppLockService) renew(ctx context.Context) error {
|
func (s *AppLockService) renew(ctx context.Context) error {
|
||||||
|
db, err := s.db.DB()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get DB connection: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 1; attempt <= renewRetries; attempt++ {
|
for attempt := 1; attempt <= renewRetries; attempt++ {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@@ -246,42 +262,56 @@ func (s *AppLockService) renew(ctx context.Context) error {
|
|||||||
switch s.db.Name() {
|
switch s.db.Name() {
|
||||||
case "sqlite":
|
case "sqlite":
|
||||||
query = `
|
query = `
|
||||||
UPDATE kv
|
UPDATE kv
|
||||||
SET value = ?
|
SET value = ?
|
||||||
WHERE key = ?
|
WHERE key = ?
|
||||||
AND json_extract(value, '$.lock_id') = ?
|
AND json_extract(value, '$.lock_id') = ?
|
||||||
AND json_extract(value, '$.expires_at') > ?
|
AND json_extract(value, '$.expires_at') > ?
|
||||||
`
|
`
|
||||||
case "postgres":
|
case "postgres":
|
||||||
query = `
|
query = `
|
||||||
UPDATE kv
|
UPDATE kv
|
||||||
SET value = $1
|
SET value = $1
|
||||||
WHERE key = $2
|
WHERE key = $2
|
||||||
AND value::json->>'lock_id' = $3
|
AND value::json->>'lock_id' = $3
|
||||||
AND ((value::json->>'expires_at')::bigint > $4)
|
AND ((value::json->>'expires_at')::bigint > $4)
|
||||||
`
|
`
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unsupported database dialect: %s", s.db.Name())
|
return fmt.Errorf("unsupported database dialect: %s", s.db.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
opCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||||
res := s.db.WithContext(opCtx).Exec(query, raw, lockKey, s.lockID, nowUnix)
|
res, err := db.ExecContext(opCtx, query, raw, lockKey, s.lockID, nowUnix)
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
switch {
|
// Query succeeded, but may have updated 0 rows
|
||||||
case res.Error != nil:
|
if err == nil {
|
||||||
lastErr = fmt.Errorf("lock renewal failed: %w", res.Error)
|
count, err := res.RowsAffected()
|
||||||
case res.RowsAffected == 0:
|
if err != nil {
|
||||||
// Must be after checking res.Error
|
return fmt.Errorf("failed to count affected rows: %w", err)
|
||||||
return ErrLockLost
|
}
|
||||||
default:
|
|
||||||
|
// If no rows were updated, we lost the lock
|
||||||
|
if count == 0 {
|
||||||
|
return ErrLockLost
|
||||||
|
}
|
||||||
|
|
||||||
|
// All good
|
||||||
slog.Debug("Renewed application lock",
|
slog.Debug("Renewed application lock",
|
||||||
slog.Int64("process_id", s.processID),
|
slog.Int64("process_id", s.processID),
|
||||||
slog.String("host_id", s.hostID),
|
slog.String("host_id", s.hostID),
|
||||||
|
slog.Duration("duration", time.Since(now)),
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we're here, we have an error that can be retried
|
||||||
|
slog.Debug("Application lock renewal attempt failed",
|
||||||
|
slog.Any("error", err),
|
||||||
|
slog.Duration("duration", time.Since(now)),
|
||||||
|
)
|
||||||
|
lastErr = fmt.Errorf("lock renewal failed: %w", err)
|
||||||
|
|
||||||
// Wait before next attempt or cancel if context is done
|
// Wait before next attempt or cancel if context is done
|
||||||
if attempt < renewRetries {
|
if attempt < renewRetries {
|
||||||
select {
|
select {
|
||||||
|
|||||||
Reference in New Issue
Block a user