Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save adamjez/8b64091fedd588780800aa7eafa2cf9a to your computer and use it in GitHub Desktop.

Select an option

Save adamjez/8b64091fedd588780800aa7eafa2cf9a to your computer and use it in GitHub Desktop.
SkipWhenPreviousJobIsRunningAttribute.cs
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice)
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
// Hangfire.Core 1.8+ is required, for previous versions please see revision from year 2022.
using System;
using System.Collections.Generic;
using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
namespace ConsoleApp;
public class SkipWhenPreviousJobIsRunningAttribute : JobFilterAttribute, IClientFilter, IApplyStateFilter
{
private const string RunningKey = "Running";
private const string RunningJobIdKey = "RunningJobId";
private const string RecurringJobIdKey = "RecurringJobId";
private const string JobIsRunning = "yes";
private const string JobIsNotRunning = "no";
public void OnCreating(CreatingContext context)
{
// We can't handle old storages
if (context.Connection is not JobStorageConnection connection)
{
throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
}
// We should run this filter only for background jobs based on
// recurring ones
if (!context.Parameters.TryGetValue(RecurringJobIdKey, out var parameter))
{
return;
}
var recurringJobId = parameter as string;
// RecurringJobId is malformed. This should not happen, but anyway.
if (string.IsNullOrWhiteSpace(recurringJobId))
{
return;
}
var running = connection.GetValueFromHash(CreateRecurringJobKey(recurringJobId), RunningKey);
if (JobIsRunning.Equals(running, StringComparison.OrdinalIgnoreCase))
{
var runningJobId = connection.GetValueFromHash(CreateRecurringJobKey(recurringJobId), RunningJobIdKey);
if (IsJobStale(connection, runningJobId))
{
ResetJobStatus(connection, recurringJobId);
}
else
{
context.Canceled = true;
}
}
}
public void OnCreated(CreatedContext filterContext)
{
}
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
if (context.NewState is EnqueuedState)
{
TrySetRunningFlag(context, JobIsRunning);
}
else if ((context.NewState.IsFinal && !FailedState.StateName.Equals(context.OldStateName, StringComparison.OrdinalIgnoreCase)) ||
context.NewState is FailedState)
{
TrySetRunningFlag(context, JobIsNotRunning);
}
}
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
}
private static void TrySetRunningFlag(ApplyStateContext context, string state)
{
// We can't handle old storages
if (context.Connection is not JobStorageConnection connection)
{
throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
}
var recurringJobId = context.GetJobParameter<string>(RecurringJobIdKey, allowStale: true);
if (string.IsNullOrWhiteSpace(recurringJobId))
{
return;
}
if (context.Storage.HasFeature(JobStorageFeatures.Transaction.AcquireDistributedLock))
{
// Acquire a lock in newer storages to avoid race conditions
((JobStorageTransaction)context.Transaction).AcquireDistributedLock(
$"lock:recurring-job:{recurringJobId}",
timeout: TimeSpan.FromSeconds(10));
}
// Checking whether a recurring job exists
var recurringJob = connection.GetValueFromHash(CreateRecurringJobKey(recurringJobId), "Job");
if (string.IsNullOrEmpty(recurringJob))
{
return;
}
// Changing the running state
var jobId = JobIsRunning.Equals(state, StringComparison.OrdinalIgnoreCase)
? context.BackgroundJob.Id
: string.Empty;
context.Transaction.SetRangeInHash(
CreateRecurringJobKey(recurringJobId),
[
new KeyValuePair<string, string>(RunningKey, state),
new KeyValuePair<string, string>(RunningJobIdKey, jobId),
]);
}
private static void ResetJobStatus(JobStorageConnection connection, string recurringJobId)
{
using var tx = connection.CreateWriteTransaction();
tx.SetRangeInHash(
CreateRecurringJobKey(recurringJobId),
[
new KeyValuePair<string, string>(RunningKey, JobIsNotRunning),
new KeyValuePair<string, string>(RunningJobIdKey, string.Empty),
]);
tx.Commit();
}
private static bool IsJobStale(JobStorageConnection connection, string? runningJobId)
{
if (string.IsNullOrWhiteSpace(runningJobId))
{
return true;
}
var jobData = connection.GetJobData(runningJobId);
if (jobData is null)
{
return true;
}
return jobData.State == SucceededState.StateName
|| jobData.State == DeletedState.StateName
|| jobData.State == FailedState.StateName;
}
private static string CreateRecurringJobKey(string recurringJobId) => $"recurring-job:{recurringJobId}";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment