Health Monitoring and Task Reassignment in our Service Bus Applications

FubuMVC 3.0 actually has a full blown service bus framework that started as an add on project called “FubuTransportation.” We’ve used it in production for 3 years, we’re generally happy with it, and it’s the main reason why we’ve done an about face and decided to continue with FubuMVC again.

Corey Kaylor and I have been actively working on FubuMVC again. We’re still planning a reboot of at least the service bus functionality to the CoreCLR with a more efficient architecture next year (“Jasper“), but for right now we’re just working to improve the performance and reliability of our existing service bus applications. The “health monitoring” and persistent task functionality explained here has been in our codebase for a couple years and used a little bit in production, but we’re about to try to use it for something mission critical for the first time. I’d love to have any feedback or suggestions for improvements you might have. All the code shown here is pulled from this namespace in GitHub.

A Distributed System Spread Over Several Nodes

For the sake of both reliability and the potential for horizontal scaling later, we want to be able to deploy multiple instances of our distributed application to different servers (or separate processes on the same box as shown below:

 

BasicApp
A distributed application behind a load balancer

We generally employ hardware load balancers to distribute incoming requests through all the available nodes. So far, all of this is pretty typical and relatively straight forward as long as any node can service any request.

However, what if your architecture includes some kind of stateful “agent” that can, or at least should, be active on only one of the nodes at a time?

I’m hesitant to describe what we’re doing as Agent Oriented Programming, but that’s what I’m drawing on to think through this a little bit.

Agents
“Agent” worker processes should only be running on a single node

In our case, we’re working with a system that is constantly updating a “grid” of information stored in memory and directing work through our call centers. Needless to say, it’s a mission critical process. What we’re attempting to do is to make the active “agent” for that planning grid be managed by FubuMVC’s service bus functionality so that it’s always running on exactly one node in the cluster. That means that we need to be able to:

  • Have the various nodes constantly checking up on each other to make sure that agent is running somewhere and the assigned node is actually up and responsive
  • Be able to initiate the assignment of that agent to a new node if it is not running at all
  • Potentially shut down any extraneous instances of that agent if there is more than one running

Years ago, Chris Patterson of MassTransit fame explained something to me called the Bully Algorithm that can be used for exactly this kind of scenario. With a lot of help from my colleague Ryan Hauert, we came up with the approach described in this post.

 

Persistent Tasks

I reserve the right to change the name later (IAgent maybe?), but for now the key interface for one of these sticky agents is shown below:

public interface IPersistentTask
{
    Uri Subject { get; }

    // This is supposed to be the health check
    // Should throw an exception if anything is wrong;)
    void AssertAvailable();
    void Activate();
    void Deactivate();
    bool IsActive { get; }

    // This method would perform the actual assignment
    Task<ITransportPeer> SelectOwner(IEnumerable<ITransportPeer> peers);
}

Hopefully the interface is largely self descriptive. We were already using Uri’s throughout the rest of the  code, and that made sense to us to use that to identify the persistent tasks. This interface gives developers the hooks to start or stop the task from running, a way to do health checks, and a way to apply whatever kind of custom owner selection algorithm you want.

These persistent tasks are added to a FubuMVC application by registering an instance of this interface into the application container (there is a simple recipe for standalone tasks that deals with both interfaces in one class):

public interface IPersistentTaskSource
{
    // The scheme or protocol from the task Uri's
    string Protocol { get; }

    // Subjects of all the tasks built by this
    // object that should be running
    IEnumerable<Uri> PermanentTasks();

    // Create a task object for the given subject
    IPersistentTask CreateTask(Uri uri);
}

The IPersistentTaskSource might end up going away as unnecessary complexity in favor of just directly registering IPersistentTask’s. It was built with the idea of running, assigning, and monitoring agents per customer/tenant/region/etc. I’ve built a couple systems in the past half decade where it would have been very advantageous to have had that functionality.

The ITransportPeer interface used in the SelectOwner() method models the available nodes and it’s described in the next section.

 

Modeling the Nodes

The available nodes are modeled by the ITransportPeer shown below:

public interface ITransportPeer
{
        // Try to make this node take ownership of a task
	Task<OwnershipStatus> TakeOwnership(Uri subject);

        // Tries to ask the peer what the status is for all
        // of its assigned tasks
	Task<TaskHealthResponse> CheckStatusOfOwnedTasks();

	void RemoveOwnershipFromNode(IEnumerable<Uri> subjects);

	IEnumerable<Uri> CurrentlyOwnedSubjects();

	string NodeId { get; }
	string MachineName { get; }
	Uri ControlChannel { get; }

        // Shutdown a running task
	Task<bool> Deactivate(Uri subject);
}

ITransportPeer’s come in just two flavors:

  1. A class called PersistentTaskController that directly controls and manages the tasks on the executing node.
  2. A class called TransportPeer that represents one of the external nodes. The methods in this version send messages to the control channel of the node represented by the peer object and wait for a matching response. The other nodes will consume those messages and make the right calls on the local PersistentTaskController.

 

Reassigning Tasks

Now that we have a way to hook in tasks and a way to model the available peers, we need some kind of mechanism within IPersistentTask classes to execute the reassignment. Right now, the only thing we’ve built and used so far is a simple algorithm to assign a task based on an order of preference using the OrderedAssignment class shown below:

public class OrderedAssignment
{
	private readonly Uri _subject;
	private readonly ITransportPeer[] _peers;
	private int _index;

	public OrderedAssignment(Uri subject, IEnumerable<ITransportPeer> peers)
	{
		_subject = subject;
		_peers = peers.ToArray();
		_index = 0;
	}

	public async Task<ITransportPeer> SelectOwner()
	{
		return await tryToSelect().ConfigureAwait(false);
	}

	private async Task<ITransportPeer> tryToSelect()
	{
		var transportPeer = _peers[_index++];

		try
		{
			var status = await transportPeer.TakeOwnership(_subject).ConfigureAwait(false);

			if (status == OwnershipStatus.AlreadyOwned || status == OwnershipStatus.OwnershipActivated)
			{
				return transportPeer;
			}
		}
		catch (Exception e)
		{
			Debug.WriteLine(e);
		}

		if (_index >= _peers.Length) return null;

		return await tryToSelect().ConfigureAwait(false);
	}
}

Inside of an IPersistentTask class, the ordered assignment could be used something like this:

public virtual Task<ITransportPeer> SelectOwner(IEnumerable<ITransportPeer> peers)
{
    // it's lame, but just order by the control channel Uri
    var ordered = peers.OrderBy(x => x.ControlChannel.ToString());
    var completion = new OrderedAssignment(Subject, ordered);

    return completion.SelectOwner();
}

 

Health Monitoring via the Bully Algorithm

So now we have a way to model persistent tasks, reassign tasks, and model the connectivity to all the available nodes.

Inside of PersistentTaskController is this method that checks all the known persistent task state on every known running node:

public async Task EnsureTasksHaveOwnership()
{
	// Go run out and check the status of all the tasks that are
	// theoretically running on each node
	var healthChecks = AllPeers().Select(async x =>
	{
		var status = await x.CheckStatusOfOwnedTasks().ConfigureAwait(false);
		return new { Peer = x, Response = status };
	}).ToArray();

	var checks = await Task.WhenAll(healthChecks).ConfigureAwait(false);

	// Determine what corrective steps, if any, should be taken
        // to ensure that every known task is running in just one place
	var planner = new TaskHealthAssignmentPlanner(_permanentTasks);
	foreach (var check in checks)
	{
		planner.Add(check.Peer, check.Response);
	}

	var corrections = planner.ToCorrectionTasks(this);

	await Task.WhenAll(corrections).ConfigureAwait(false);

	_logger.Info(() => "Finished running task health monitoring on node " + NodeId);
}

In combination with the TaskHealthAssignmentPlanner class, this method is able to jumpstart any known tasks that either aren’t running or were running on a node that is no longer reachable or reports that its tasks are in an error state.

The EnsureTasksHaveOwnership() method is called from a system level polling job running in a FubuMVC application. There’s an important little twist on that though. To try to ensure that there’s much less chance of unpredictable behavior from the health monitoring checks running on each node simultaneously, the timing of the polling interval is randomized from this settings class:

public double Interval
{
    get
    {
        // The *first* execution of the health monitoring takes
        // place 100 ms after the app is initialized
        if (_initial)
        {
            _initial = false;
            return 100;
        }
                
        // After the first call, the polling interval is randomized
        // between each call
        return Random.Next(MinSeconds, MaxSeconds) * 1000;
    }
}

I found an article advising you to randomize the intervals somewhere online at the time we were building this two years ago, but I don’t remember where that was:(

By using the bully algorithm, we’re able to effectively make a cluster of related nodes able to check up on each other and start up or reassign any tasks that have gone down. We’re utilizing this first to do a “ready standby” failover of an existing system.

Actually Doing the Health Checks

The health check needs to run some kind of “heartbeat” action implemented through the IPersistentTask.AssertAvailable() method on each persistent task object to ensure that it’s really up and functioning. The following code is taken from PersistentTaskController where it does a health check on each running local task:

public async Task<TaskHealthResponse> CheckStatusOfOwnedTasks()
{
	// Figure out which tasks are running on this node right now
	var subjects = CurrentlyOwnedSubjects().ToArray();

	if (!subjects.Any())
	{
		return TaskHealthResponse.Empty();
	}

	// Check the status of each running task by calling the
	// IPersistentTask.AssertAvailable() method
	var checks = subjects
		.Select(async subject =>
		{
			var status = await CheckStatus(subject).ConfigureAwait(false);
			
			return new PersistentTaskStatus(subject, status);
		})
		.ToArray();

	var statusList = await Task.WhenAll(checks).ConfigureAwait(false);

	return new TaskHealthResponse
	{
		Tasks = statusList.ToArray()
	};
}

public async Task<HealthStatus> CheckStatus(Uri subject)
{
	var agent = _agents[subject];

	return agent == null 
		? HealthStatus.Unknown 
		: await checkStatus(agent).ConfigureAwait(false);
}

private static async Task<HealthStatus> checkStatus(IPersistentTaskAgent agent)
{
	return agent.IsActive
		? await agent.AssertAvailable().ConfigureAwait(false)
		: HealthStatus.Inactive;
}

 

 

Subscription Storage

Another obvious challenge is how does each node “know” about its peers? FubuMVC pulls that off with its “subscription” subsystem. In our case, each node is writing information about itself to a shared persistence store (mostly backed by RavenDb in our ecosystem, but we’re moving that to Marten). The subscription persistence also enables each node to discover its peers.

Once the subscriptions are established, each node can communicate with all of its peers through the control channel addresses in the subscription storage. That basic architecture is shown below with the obligatory boxes and arrows diagram:

Subscriptions

The subscription storage was originally written to enable dynamic message subscriptions between systems, but it’s also enabled our health monitoring strategy shown in this post.

 

 

Control Queue

We need the health monitoring and subscription messages between the various nodes to be fast and reliable. We don’t want the system level messages getting stuck in queues that might be backed up with normal activity. To that end, we finally put the idea of a designated “control channel” into FubuMVC so that you can designate a single channel as the preferred mechanism for sending control messages.

The syntax for making that designation is shown below in a code sample taken from FubuMVC’s integrated testing:

public ServiceRegistry()
{
    // The service bus functionality is "opt in"
    ServiceBus.Enable(true);

    // I explain what "Service" in the next code sample
    Channel(x => x.Service)
        // Listen for incoming messages on this channel
        .ReadIncoming()

        // Designate this channel as preferred for system level messages         
        .UseAsControlChannel()

        // Opts into LightningQueue's non-persistent mode               
        .DeliveryFastWithoutGuarantee(); 

    // I didn't want the health monitoring running on this node
    ServiceBus.HealthMonitoring
        .ScheduledExecution(ScheduledExecution.Disabled);
}

 

If you’re wondering what in the world “x => x.Service” refers to in the code above, that just ties into FubuMVC’s strong typed configuration support (effectively the same concept as the new IOptions configuration in ASP.Net Core, just with less cruft;)). The application described by ServiceRegistry shown above also includes a class that holds configuration items specific to this application:

public class TestBusSettings
{
    public Uri Service { get; set; } = "lq.tcp://localhost:2215/service".ToUri();
    public Uri Website { get; set; } = "lq.tcp://localhost:2216/website".ToUri();
}

The primary transport mechanism we use is LightningQueues (LQ), an OSS library built and maintained by my colleague Corey Kaylor. LQ is normally a “store and forward” queue, but it has a new, opt-in “non persistent” mode (like ZeroMQ, except .Net friendly) that we can exploit for our control channels in FubuMVC. In the case of the control queues, it’s advantageous to not persist those messages anyway.

 

My Concerns

It’s damn complicated and testing was obscenely hard. I’m a little worried about network hiccups causing it to unnecessarily try to reassign tasks. We might put some additional retries into the health checks. The central subscription persistence is a bit of a concern too because that’s a single point of failure.

2 thoughts on “Health Monitoring and Task Reassignment in our Service Bus Applications

  1. There are a few products that solve many of the “hard” parts of this, such as making sure a process is only running in one place. I’d suggest having a look at the excellent tool Consul (consul.io), if nothing else then some inspiration of how others have solved it. Because as you conclude, it is complicated.

Leave a comment