Saturday, May 24, 2014

Monitorable Grains

In the Orleans world, Grain are somewhat 'lost' by default. You can activate grains and the runtime takes care of its lifecycle and when it's deactivated. For my scenario, I need some sort of Grain that is being supervised by a Supervisor (or UniverseGrain). The Supervisor can for instance:

- keep track of the amount of activated grains
- keep the history of heartbeats
- notify others on the heartbeat
- more housekeeping jobs

By default, grains don't contain this kind of functionality. I created a new type of grain as an abstract class called MonitorableGrain, indicating that it can be monitored somehow. What does the MonitorableGrain does by default?

- on activation, register itself at the SupervisorGrain
- by using a timer, send out a heartbeat every X minutes to the supervisor to indicate it is still alive
- on deactivation, unregister itself at the supervisor.

All my future grains will derive from the abstract MonitorableGrain and inherited this behaviour by default. The IMonitorableGrain interface contains no methods yet. See below for the code.

public class MonitorableGrain : Orleans.GrainBase, IMonitorableGrain, Orleans.IGrain
    {
        IOrleansTimer heartBeatTimer = null;
        ISupervisorGrain superVisorGrain = null;

        public override Task ActivateAsync()
        {
            Trace.TraceInformation("Activation!");
            //register the grain at the superVisor
            var superVisorGrain = SupervisorGrainFactory.GetGrain(0);
            //create a timer that executes a heartbeat. Timer instead of Reminder since we are only interested in activated grains
            RegisterTimer(HeartBeat, this, TimeSpan.FromMinutes(0), TimeSpan.FromSeconds(15));

            superVisorGrain.RegisterGrain(this);
            return base.ActivateAsync();
        }


        private Task HeartBeat(object grain)
        {
            Trace.WriteLine(String.Format("Heartbeat of {0} at {1}", this.GetPrimaryKey().ToString(), DateTime.Now.ToLongTimeString()));
            superVisorGrain.HeartBeat(grain as IMonitorableGrain, "I am still alive");
            return TaskDone.Done;
        }

        public override Task DeactivateAsync()
        {
            Trace.WriteLine("Deactivation!");
            superVisorGrain.UnregisterGrain(this);
            heartBeatTimer.Dispose();
            return base.DeactivateAsync();
        }
    }

In the snippets below you will see the ISupervisorGrain interface and the implementation.

public interface ISupervisorGrain : Orleans.IGrain
{
   Task<int> NumberOfGrains();
   Task RegisterGrain(IMonitorableGrain monitorableGrain);
   Task UnregisterGrain(IMonitorableGrain monitorableGrain);
   Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage);
}

public class HeartBeat 
    {
        private Guid grainId;

        public Guid GrainId
        {
          get { return grainId; }
          set { grainId = value; }
        }

        private DateTime heartBeatTime;

        public DateTime HeartBeatTime
        {
          get { return heartBeatTime; }
          set { heartBeatTime = value; }
        }

        private string heartBeatMessage;

        public string HeartBeatMessage
        {
          get { return heartBeatMessage; }
          set { heartBeatMessage = value; }
        }

        public HeartBeat(Guid grainId, DateTime heartBeatTime, string heartBeatMessage)
        {
            this.grainId = grainId;
            this.heartBeatTime = heartBeatTime;
            this.heartBeatMessage = heartBeatMessage;
        }
    }

public class SupervisorGrain : Orleans.GrainBase, ISupervisorGrain
    {
        //list of grains
        Dictionary grains = new Dictionary();
        //history of heartbeats
        List heartBeats = new List();

        public override Task ActivateAsync()
        {
            Trace.TraceInformation("Activation");
            return base.ActivateAsync();
        }

        public Task<int> NumberOfGrains()
        {
            Trace.TraceInformation("NumberOfGrains");
            return Task.FromResult<int>(grains.Count);
        }


        public Task RegisterGrain(IMonitorableGrain monitorableGrain)
        {
            Trace.TraceInformation("RegisterGrain");
            grains.Add(monitorableGrain.GetPrimaryKey(), monitorableGrain);
            return TaskDone.Done;
        }

        public Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage)
        {
            Trace.TraceInformation("HeartBeat");
            heartBeats.Add(new HeartBeat(grain.GetPrimaryKey(), DateTime.Now, heartBeatMessage));
            return TaskDone.Done;
                
        }

        public Task UnregisterGrain(IMonitorableGrain monitorableGrain)
        {
            Trace.TraceInformation("UnregisterGrain");
            grains.Remove(monitorableGrain.GetPrimaryKey());
            return TaskDone.Done;
        }
    }

public class SupervisorGrain : Orleans.GrainBase, ISupervisorGrain
        {
            //list of grains
            Dictionary grains = new Dictionary();
            //history of heartbeats
            List heartBeats = new List();

            public override Task ActivateAsync()
            {
                Trace.TraceInformation("Activation");
                return base.ActivateAsync();
            }

            public Task<int> NumberOfGrains()
            {
                Trace.TraceInformation("NumberOfGrains");
                return Task.FromResult<int>(grains.Count);
            }


            public Task RegisterGrain(IMonitorableGrain monitorableGrain)
            {
                Trace.TraceInformation("RegisterGrain");
                grains.Add(monitorableGrain.GetPrimaryKey(), monitorableGrain);
                return TaskDone.Done;
            }

            public Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage)
            {
                Trace.TraceInformation("HeartBeat");
                heartBeats.Add(new HeartBeat(grain.GetPrimaryKey(), DateTime.Now, heartBeatMessage));
                return TaskDone.Done;
                
            }

            public Task UnregisterGrain(IMonitorableGrain monitorableGrain)
            {
                Trace.TraceInformation("UnregisterGrain");
                grains.Remove(monitorableGrain.GetPrimaryKey());
                return TaskDone.Done;
            }

       
        }





Will continue to improve the MonitorableGrain and SupervisorGrain but this is a good start.






Thursday, May 15, 2014

Grains and Reminders

The previous blog posts were about Orleans and Grains and how to build up a nice universe of grains. This post will demonstrate how to use reminders.

Reminders are almost equal to timer but there are a few differences:
- reminders will always trigger even when a grain is not implicitly activated.
- a grain gets reactivated when the reminder is fired.
- reminders should not be used for intensive timing purposed (don't fire one every second).

Reminders are a great fit for 'housekeeping' purposes. In my scenario, where devices are represented virtually by a grain and keeping state and history of the device, reminders will be used to periodically check if the device out there is still provisioned and part of the IoT game. Consider when I remove service identities on ACS. After removal devices are not able to send their temperatures to the event topic anymore and are not able to receive any commands in their subscription anymore. This probably means that the device is turned off, removed, stolen or any other reasons to remove the device from the universe. Using a reminder in this scenario can be helpful!

The reminder will perform the following tasks:
- fire every 10 minutes
- check on ACS if the associated service identity is still there
- if the service identity is missing, the reminder will set the device grain in a "inactive" state and remove it from the universe grain devices list and move it to the universe grain's "inactive devices"
- the reminder will also remove any other ACS related adminstration (rule groups, relying party)

Implementing this completely rules out the Device grain and puts the device in a blocked or inactive state (removing a grain is not possible in Orleans since you can activate or deactivate a grain). The block/inactive device grain will appear on the client as a red pushpin indicating it's deactivated state. Obviously, there are no updates send by the real device anymore since it's removed/stolen are turned off.

Reminders are persisted (they need to be since they also get trigger on deactivated grains!).

To make sure your reminders are persistent you need to add the following line to your OrleansConfiguration.xml settings.

ReminderServiceType="AzureTable"/>

To enable reminders on your grain you need to implement the IRemindable interface.


public class DeviceGrain : Orleans.GrainBase, IDeviceGrain, IRemindable

Implementation of the reminder which is started right away and gets fired every 10 minutes. In the RegisterDevice() method of the Device Grain we also register the reminder.

private IOrleansReminder reminder;

async Task IDeviceGrain.RegisterDevice()
{
   reminder = await RegisterOrUpdateReminder("Housekeeping", TimeSpan.FromMinutes(0), TimeSpan.FromMinutes(10));

   var universe = UniverseGrainFactory.GetGrain(0);
   await universe.RegisterDevice(this);
   }
}

public Task ReceiveReminder(string reminderName, TickStatus status)
{
   Trace.WriteLine("Housekeeping reminder is triggered");
   return TaskDone.Done;
}

Explicit deregistration of the device grain by user or system action deregisters the reminder we created before. This prevents the continuation of reminder being triggered.


public async Task UnregisterDevice()
{
   //make sure the reminder goes away since they are persistent! Explicit remove the reminder
   await UnregisterReminder(reminder);
}

Every time the reminder is triggered, I check my ACS assets to see if the device (it's service identity) was removed. Using reminders are a great way to do housekeeping task in the Internet of Things world and keep both your grain universe and related LOB systems (in my case ACS) as clean as possible and maintain Orleans in state where it is an exact representation of your real IT assets.

Happy coding!





Thursday, May 8, 2014

Grains, Grains and more Grains

The last two blog posts demonstrated how to use Orleans in the context of Internet of Things. This post outlines how  to setup your universe of Grains (in my scenario a UniverseGrain, a SignalRGrain and multiple DeviceGrain representing a real device somewhere out there). The picture belows depicts this ecosystem of Grains.




Here are the interfaces of my three types of Grains. The Orleans SDK generates code during compile-time and creates factories for every single interface that has the Orleans.IGrain marker interface.

The UniverseGrain

public interface IUniverseGrain : Orleans.IGrain
    {
        Task<int> NumberOfDevices();
        Task<double> AverageTemperature();
        Task RegisterDevice(IDeviceGrain grain);
        Task > GetDevices();
    }

1. NumberOfDevices() : returns the total number of devices that are registered to the universe
2. AverageTemperature(): takes the average temperature of every single device, sums it up and averages again.
3. RegisterDevice(): a DeviceGrain registers itself at the universe
4. GetDevices() : returns a list of registered DeviceGrains. This comes in handy to get a 'snapshot' of the current device universe. This method is called by the SignalR hub in the OnConnected override. Every time a new browser clients connects, the hub call this method on the UniverseGrain to get all the devices and sends this information to the browser where pushpins are drawn and temperatures are shown.

Once a device registers itself at the UniverseGrain, the UniverseGrain also calls the PushNotifierGrain to tell the world of this registration. Showing a toast on a webpage for example.

public async Task RegisterDevice(IDeviceGrain grain)
        {
            //also register at SignalR grain
            grains.Add(grain);
            var notifier = PushNotifierGrainFactory.GetGrain(0);
            await notifier.RegisterDevice(grain);
        }

The DeviceGrain

The DeviceGrain implements several methods, some of them are obvious but notice the attribute that decorates the interfaces.

[ExtendedPrimaryKey]
    public interface IDeviceGrain : Orleans.IGrain
    {
        Task SetLatitude(string latitude);
        Task SetLongitude(string longitude);

        Task<string> GetLatitude();
        Task<string> GetLongitude();
        Task<string> GetDeviceID();
        Task<double> GetAverageTemperature();
        Task<double> GetTemperature();

        Task SetTemperature(double temperature);
        Task RegisterDevice();
    }

By setting this attribute you can use your own propriarty "primary key" mechanism.

var deviceID = Guid.NewGuid().ToString("N");
var device = DeviceGrainFactory.GetGrain(0, deviceID);

The DeviceGrain holds this deviceID primary key by overriding the ActivateAsync method. Later on, this deviceID is also persisted in "State".

public override async Task ActivateAsync()
        {
            //store the deviceID
            this.GetPrimaryKey(out deviceID);
            this.State.DeviceID = deviceID;
          await base.ActivateAsync();
        }

Registration of a device is simple. Just call RegisterDevice on the UniverseGrain and the device is added to a list in the UniverseGrain.

async Task IDeviceGrain.RegisterDevice()
{
            var universe = UniverseGrainFactory.GetGrain(0);
            await universe.RegisterDevice(this);
}

The PushNotifierGrain(SignalR) grain is decorated with the StatelessWorker attribute. This means that Orleans can create multiple activations of this Grain for throughput reasons. This can only be done for grains with no state or only immutable state. Since the PushNotifierGrain only holds a "IHubProxy" that is immutable  this can be done.


    [StatelessWorker]
    public interface IPushNotifierGrain : Orleans.IGrain
    {
        Task SendMessage(string deviceid, string temperature);
        Task SendGeneralMessage(string message);
        Task RegisterDevice(IDeviceGrain device);
        Task UpdateTemperature(IDeviceGrain device);
        Task SendMessage(string message);
    }

When the temperature is set on a device grain. For example, when a true device somewhere in the world reports back his temperature on a Service Bus Topic and one of the Subscriptions is maintained by a workerrole. A role that maintains the Orleans Universe) in my scenario. Very scalable since I can scale up this workerrole to infinity to handle more and more report backs from devices.

So, the temperature is set and then the PushNotifierGrain kicks in. The DeviceGrain sets its own current temperature and add the temperature to its history of temperatures which is held in its state (to make it able to persist). Finally, it tells the notifiergrain to send this updated temperature to everybody that is involved (in this case, browsers). In my scenario, this results in a change of the text of a pushpin on Bing Maps. The text displays the temperature.

var notifier = PushNotifierGrainFactory.GetGrain(0);
this.State.Temperature = temperature;
temperatures.Add(temperature);
await notifier.UpdateTemperature(this);


Persistency
I want my universe of Grains to be persistent. If my silo goes down for any reason (locally running a Silo in a Workerrole means that it goes down every time I stop debugging...) the state of grains needs to be persisted. What data needs to be persistet then? First of all, the current temperature of each and every devicegrain but also the history of temperatures of that particular devices. The UniverseGrain also needs some persistence because the next time the silo comes up I want the deviceGrains to be there and populated but also the UniverseGrain needs to get it's list of DeviceGrains again. So how can I assure this persistency? I will also explain this in some of the next posts.

Observable?
Are Grains Observable? Yes they are but more on that topic in the next post! Enjoy Orleans :-) Thanks to http://hilite.me/ to nicely format C# code.

And thanks @Sergey Bykov from the Orleans team for providing some nice insights!


Monday, May 5, 2014

Fine-graining your Orleans Grains inside the IoT universe

My previous was about Internet of Things and how MSR Orleans can play an important role in that scenario. Keeping state and additional complex computing that is executed by individual Grains. Thus, offloading your core backend services and move the load to the Grains. For completeness, see the figure below to get an overview of the architecture.



Every provisioned device causes a Device Grain to be activated in the Orleans Universe. To create some statistical functionality, one Universe Grain is also activated. This Grain keeps a list of Device Grains and is able to explose some statistical information like number of devices, number of messages exchanged but also some methods to remove or disable devices. A Device Grain has the following interface:

    [ExtendedPrimaryKey]
    public interface IDeviceGrain : Orleans.IGrain
    {
        Task SetLatitude(string latitude);
        Task SetLongitude(string longitude);

        Task GetLatitude();
        Task GetLongitude();
        Task GetDeviceID();
        Task RegisterDevice();
    }

To take the primary key inside your grain code use the following snippet. Override the ActivateAsync method and use the GetPrimaryKey method. The returned "out deviceID" is then stored inside the private string deviceID.

        private string deviceID;

        public override async Task ActivateAsync()
        {
            //store the deviceID
            this.GetPrimaryKey(out deviceID);
            this.State.DeviceID = deviceID;
         await base.ActivateAsync();
        }

To activate a device grain from your code have a look at the following:

            var device = DeviceGrainFactory.GetGrain(0, deviceID);
            await device.SetLatitude(latitude);
            await device.SetLongitude(longitude);

            await device.RegisterDevice();

As you can see there is no new keyword or some factory there to "create" grains but you only "activate" them. The deviceID is passed as an additional parameter to the GetGrain method. This deviceID is then grabbed inside the ActivateAsync method of the Device Grain (GetPrimaryKey).

Some async methods and an extended primary key since I want to use my own private key mechanisme. In this scenario, a device is uniquely identified with a guid.

So, the Grains are interacting with their Universe Grain (that keeps track of every grain and some statistical information like number of devices

I also have one super Grain that takes care off all the SignalR functionality. Once a device is created:
- a Grain gets activated with the DeviceID Guid (returned from the provision backend service)
- this Grain also notifies the SignalR Grain that the device is registered
- The SignalR Grain maintains a SignalR Hub and call methods on the clients to notify them of a new registration and draw a pushpin on the Bing Map.

The SignalR Grain is decorated with the [Reentrant] attribute to enable interleaving. By default, an activation of a grain doesn't receive a new requests untill the current request is being handled and all the promises created during the processing of the request are resolved. Marking a class [Reentrant] overrides this behavior and enables multiple calls to a grain, causing multiple activations of the grain (if needed).

See the code snippet on the SignalR Grain. This one is also used in the GPSTracker sample.

[Reentrant]
    public class PushNotifierGrain : Orleans.GrainBase, IPushNotifierGrain
    {
//implementation
}

The ActivateAsync method is overriden and opens a hub connection, whereas the Hub is hosted inside the MVC app called IoTHub. To make sure the code is working in both Azure and locally without Azure you need to check the RoleEnvironment.IsAvailable.

if (RoleEnvironment.IsAvailable)
            {
                 var hubConnection = new HubConnection("http://127.0.0.1:81");
            var hub = hubConnection.CreateHubProxy("IoTHub");
            await hubConnection.Start();

            }
            else
            {
                 var hubConnection = new HubConnection("http://localhost:48777");
            var hub = hubConnection.CreateHubProxy("IoTHub");
            await hubConnection.Start();
            }
Notice the await keyword since we are heavily into asynchronous modus!

The Universe Grain keeps a list of all the devices that are available in your ecosystem. Once a new client (a webbrowser) opens the application containing the bing map. This MVC application contains some WebAPI that interact with Orleans. One of the WebAPI methods is to get a list of all the current devices (a snapshot) and draws them on the bing map. From that point on, new status updates and so on are notified by the SignalR infrastructure.

The device registers itself at the SignalR Grain.

public async Task RegisterDevice()
        {
            var notifier = PushNotifierGrainFactory.GetGrain(0);
            await notifier.RegisterDevice(this);
        }

Inside the RegisterDevice method of the SignalR Grain:

public Task RegisterDevice(IDeviceGrain device)
        {
            foreach (var hub in hubs.Values)
            {
                try
                {
                    if (hub.State == ConnectionState.Connected)
                    {
                        //notify every attached hubclient (web, .NET client, Windows Store or others)
                        //the hub method Register is called with the specific device information (id, latitude, longitude)
                        hub.Invoke("Register", device.GetDeviceID(), device.GetLatitude(), device.GetLongitude());
                    }
                    else
                    {
                        hub.Start();
                    }
                }
                catch (Exception ex)
                {
                    Trace.TraceInformation("Something went wrong..." + ex.Message);
                }
            }
            return TaskDone.Done;
        }

Next time, more on the Universe Grain!


Thursday, May 1, 2014

Orleans in an IoT universe

I am currently working on a great "Internet of Things" project, everything based on Microsoft Azure technology. To represent my "universe of devices" I use MSR Orleans to provide a statefull universe of devices that interact with the outside world. The whole solution is based around Service Bus technology where every device has it's own subscription on the "Command" topic. Users or systems drop their commands in this topic and depending on a SqlFilter the different devices pick up that command and act upon it (do some device-specific operation).

Look at the figure below how this looks (simplified).


Because I also want to be able to track the history of operations performed by devices I decided to use Orleans to represent each device in a so called "Grain". Go to https://orleans.codeplex.com for more information on Orleans and how to get this baby running on your Azure services.

At the time a device is provisioned (by himself or by some operator) a new Service Identity and Relying party is created on ACS. No use of HTTPS or whatsoever because managing thousands of certificates for each and every device is not a good idea. Besides, I don't want (from a security perspective) to connect directly to the device with some inbound socket connection but I want the initiative to be at the device side. Imagine a car driving around the country where internet connectivity is sometimes there and sometimes it isn't (inside a tunnel) and we also don't want to communicate with fixed IP since IP's can change in a roaming world.

So what do we have? Devices all over the country that:
- provision themselves with some deviceID at the server
- the server creates an identity and relying party application on ACS and return that information to the device (account + key)
- the device gets a URI to the a Service Bus topic where it can drop of it's events
- the device gets a URI to the subscription especially created for the device where it can listen to and act if some command appears on it.

At this time, devices can play a role in the ecosystem. To involve Orleans in this scenario I create another subscription on the "event" topic (obviously there will be multiple subscribers on this topic in a production environment). This subscription is used to kick off Orleans and operate on "Grains" that represent a device. A grain is created (implicitly) once a device is provisioned. From that point, the grain lives forever (unless the devices gets deleted) and maintains state of the device and also keeps track of the history, heartbeats and so on.

Because I want to reflect als these updates that take place on the devices' status I also create one grain that is responsible to manage all SignalR traffic. Once a device drops an event on the event topic on SB:
- it's corresponding grain (based on deviceID) gets updated and the new status is stored inside the Grain's memory (some list).
- the SignalR grain gets notified on the change and does all the SignalR magic and updates all connected clients. In this case, the "SignalR Grain" pushes updates to the operator that just sent a Command to a devices (or all devices or a subgroup of devices).

This blogpost is to kick-off a new series of posts on IoT, Orleans and nice Azure features. Next time, more details!