Saturday, May 24, 2014

Monitorable Grains

In the Orleans world, Grain are somewhat 'lost' by default. You can activate grains and the runtime takes care of its lifecycle and when it's deactivated. For my scenario, I need some sort of Grain that is being supervised by a Supervisor (or UniverseGrain). The Supervisor can for instance:

- keep track of the amount of activated grains
- keep the history of heartbeats
- notify others on the heartbeat
- more housekeeping jobs

By default, grains don't contain this kind of functionality. I created a new type of grain as an abstract class called MonitorableGrain, indicating that it can be monitored somehow. What does the MonitorableGrain does by default?

- on activation, register itself at the SupervisorGrain
- by using a timer, send out a heartbeat every X minutes to the supervisor to indicate it is still alive
- on deactivation, unregister itself at the supervisor.

All my future grains will derive from the abstract MonitorableGrain and inherited this behaviour by default. The IMonitorableGrain interface contains no methods yet. See below for the code.

public class MonitorableGrain : Orleans.GrainBase, IMonitorableGrain, Orleans.IGrain
    {
        IOrleansTimer heartBeatTimer = null;
        ISupervisorGrain superVisorGrain = null;

        public override Task ActivateAsync()
        {
            Trace.TraceInformation("Activation!");
            //register the grain at the superVisor
            var superVisorGrain = SupervisorGrainFactory.GetGrain(0);
            //create a timer that executes a heartbeat. Timer instead of Reminder since we are only interested in activated grains
            RegisterTimer(HeartBeat, this, TimeSpan.FromMinutes(0), TimeSpan.FromSeconds(15));

            superVisorGrain.RegisterGrain(this);
            return base.ActivateAsync();
        }


        private Task HeartBeat(object grain)
        {
            Trace.WriteLine(String.Format("Heartbeat of {0} at {1}", this.GetPrimaryKey().ToString(), DateTime.Now.ToLongTimeString()));
            superVisorGrain.HeartBeat(grain as IMonitorableGrain, "I am still alive");
            return TaskDone.Done;
        }

        public override Task DeactivateAsync()
        {
            Trace.WriteLine("Deactivation!");
            superVisorGrain.UnregisterGrain(this);
            heartBeatTimer.Dispose();
            return base.DeactivateAsync();
        }
    }

In the snippets below you will see the ISupervisorGrain interface and the implementation.

public interface ISupervisorGrain : Orleans.IGrain
{
   Task<int> NumberOfGrains();
   Task RegisterGrain(IMonitorableGrain monitorableGrain);
   Task UnregisterGrain(IMonitorableGrain monitorableGrain);
   Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage);
}

public class HeartBeat 
    {
        private Guid grainId;

        public Guid GrainId
        {
          get { return grainId; }
          set { grainId = value; }
        }

        private DateTime heartBeatTime;

        public DateTime HeartBeatTime
        {
          get { return heartBeatTime; }
          set { heartBeatTime = value; }
        }

        private string heartBeatMessage;

        public string HeartBeatMessage
        {
          get { return heartBeatMessage; }
          set { heartBeatMessage = value; }
        }

        public HeartBeat(Guid grainId, DateTime heartBeatTime, string heartBeatMessage)
        {
            this.grainId = grainId;
            this.heartBeatTime = heartBeatTime;
            this.heartBeatMessage = heartBeatMessage;
        }
    }

public class SupervisorGrain : Orleans.GrainBase, ISupervisorGrain
    {
        //list of grains
        Dictionary grains = new Dictionary();
        //history of heartbeats
        List heartBeats = new List();

        public override Task ActivateAsync()
        {
            Trace.TraceInformation("Activation");
            return base.ActivateAsync();
        }

        public Task<int> NumberOfGrains()
        {
            Trace.TraceInformation("NumberOfGrains");
            return Task.FromResult<int>(grains.Count);
        }


        public Task RegisterGrain(IMonitorableGrain monitorableGrain)
        {
            Trace.TraceInformation("RegisterGrain");
            grains.Add(monitorableGrain.GetPrimaryKey(), monitorableGrain);
            return TaskDone.Done;
        }

        public Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage)
        {
            Trace.TraceInformation("HeartBeat");
            heartBeats.Add(new HeartBeat(grain.GetPrimaryKey(), DateTime.Now, heartBeatMessage));
            return TaskDone.Done;
                
        }

        public Task UnregisterGrain(IMonitorableGrain monitorableGrain)
        {
            Trace.TraceInformation("UnregisterGrain");
            grains.Remove(monitorableGrain.GetPrimaryKey());
            return TaskDone.Done;
        }
    }

public class SupervisorGrain : Orleans.GrainBase, ISupervisorGrain
        {
            //list of grains
            Dictionary grains = new Dictionary();
            //history of heartbeats
            List heartBeats = new List();

            public override Task ActivateAsync()
            {
                Trace.TraceInformation("Activation");
                return base.ActivateAsync();
            }

            public Task<int> NumberOfGrains()
            {
                Trace.TraceInformation("NumberOfGrains");
                return Task.FromResult<int>(grains.Count);
            }


            public Task RegisterGrain(IMonitorableGrain monitorableGrain)
            {
                Trace.TraceInformation("RegisterGrain");
                grains.Add(monitorableGrain.GetPrimaryKey(), monitorableGrain);
                return TaskDone.Done;
            }

            public Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage)
            {
                Trace.TraceInformation("HeartBeat");
                heartBeats.Add(new HeartBeat(grain.GetPrimaryKey(), DateTime.Now, heartBeatMessage));
                return TaskDone.Done;
                
            }

            public Task UnregisterGrain(IMonitorableGrain monitorableGrain)
            {
                Trace.TraceInformation("UnregisterGrain");
                grains.Remove(monitorableGrain.GetPrimaryKey());
                return TaskDone.Done;
            }

       
        }





Will continue to improve the MonitorableGrain and SupervisorGrain but this is a good start.






No comments:

Post a Comment