Tuesday, September 15, 2015

Internet of Things, PowerBI and Cities Skylines


This blog post describes a nice demonstration possibility for Smart City concepts. It uses a few components:

  1. A demonstration game called Cities Skylines (CSL)
  2. A mod that can be build using Visual Studio 2015 and runs inside Cities Skylines
  3. Microsoft Azure Event Hub
  4. Stream Analytics
  5. PowerBI
1. Cities Skylines
Cities Skylines is a simulation game that enables you to build roads, houses, industry, commercial buildings, water towers, electricty plants and much more objects. All together they form the "city" that is the enabler for this demonstration.

Cities Skylines offers the ability to build so called mods. In this demo, we are interested in some metadata of the city we are building and running. 

2. Building a mod
To build a mod for CSL, you need to use the .NET framework 3.5. For this .NET version there are no standard .NET libraries available to use Service Bus / Event Hub. I have create a wrapper around the available REST API. The most important method of this wrapper is SendTelemetry(object data).


public void SendTelemetry(object Data)
        {
            var sas = "";
            WebClient client = new WebClient();
            // Namespace info.
            var serviceNamespace = ;
            var hubName = ;
            var url = new Uri(string.Format(@"https://{0}.servicebus.windows.net/{1}/publishers/{2}/messages", serviceNamespace, hubName,
                );

            client.Headers[HttpRequestHeader.Authorization] = sas;
            client.Headers[HttpRequestHeader.KeepAlive] = "true";
            var payload = SimpleJson.SimpleJson.SerializeObject(Data);

            var result = client.UploadString(url, payload); //send the payload to the eventhub
        }


The data that is send to the Event Hub is retrieved inside the mod:


var totalCapacity = DistrictManager.instance.m_districts.m_buffer[0].GetElectricityCapacity();
                    var totalConsumption = DistrictManager.instance.m_districts.m_buffer[0].GetElectricityConsumption();
                    var totalWaterCapacity = DistrictManager.instance.m_districts.m_buffer[0].GetWaterCapacity();
                    var totalWaterConsumption = DistrictManager.instance.m_districts.m_buffer[0].GetWaterConsumption();
                    var totalExport = DistrictManager.instance.m_districts.m_buffer[0].GetExportAmount();
                    var totalImport = DistrictManager.instance.m_districts.m_buffer[0].GetImportAmount();
                    var totalIncome = DistrictManager.instance.m_districts.m_buffer[0].GetIncomeAccumulation();
                    var totalSewageCapacity = DistrictManager.instance.m_districts.m_buffer[0].GetSewageCapacity();
                    var totalSewage = DistrictManager.instance.m_districts.m_buffer[0].GetSewageAccumulation();
                    var totalUnemployed = DistrictManager.instance.m_districts.m_buffer[0].GetUnemployment();
                    var totalWorkplace = DistrictManager.instance.m_districts.m_buffer[0].GetWorkplaceCount();
                    var totalWorkers = DistrictManager.instance.m_districts.m_buffer[0].GetWorkerCount();



3. Event Hub
It is easy to create a event hub on the Azure portal (portal.azure.com) if you have an MSDN subscription available or a different subscription (free trial is also available). The Event Hub just needs to be setup.

4. Stream Analytics 
The stream analytics job takes the messages from the Event Hub that is being populated by the mod running in CSL. The SA job consists of three important concepts:
- Input: where does the information come from? In this case, it is the event hub that is created in step 3. 
- Query: the query that will run constantly on the data in-flight on the Event Hub. For this scenario we take the average of different fields that are part of the message sent to the Event Hub by the CSL mod.


SELECT
    System.Timestamp as EventDate, 
    AVG(TotalWaterCapacity) as TotalWaterCapacity, 
    AVG(TotalWaterConsumption) as TotalWaterConsumption,
    AVG(TotalElectricityCapacity) as TotalElectricityCapacity,
    AVG(TotalElectricityConsumption) as TotalElectricityConsumption,
    AVG(TotalExport) as TotalExport,
    AVG(TotalImport) as TotalImport,
    AVG(TotalSewageCapacity) as TotalSewageCapacity,
    AVG(TotalSewageAccumulation) as TotalSewageAccumulation,
    AVG(TotalUnemployed) as TotalUnemployed,
    AVG(TotalWorkplace) as TotalWorkplace,
    AVG(TotalWorkers) as TotalWorkers,
    AVG(TotalCarsInCongestion) as TotalCarsInCongestion,
    AVG(TotalVehicles) as TotalVehicles
INTO
    electricitybuilding
FROM
    iotwatch
GROUP BY TUMBLINGWINDOW(SS,10), EventDate
- Output: the output for this SA job is PowerBI. So every message being processed by SA is pushed to our PowerBI environment we need to setup in step 5.


5. PowerBI
On app.powerbi.com you can create an online account for using PowerBI in this demo. Just create an account and you are of to go.

Once the game is running with the mod enabled, it starts sending the data to the event hub. The SA job processes that data and forwards it to your PowerBI environment. When the SA processes data succesfully there will be a dataset available in your PowerBI environment called "electricity". With this dataset we can create reports and pin them to our dashboard in PowerBI.


6. The Result
A great game running a city and a stunning dashboard in PowerBI that contains realtime data coming from the game!



The PowerBI output. You can see I demolished some buildings that cause some essential utilities to have not enough capacitiy. This causes shortages on electricity, water, sewage and a increasing unemployment rate!





If you need more detailed information on the specific steps, just let me know.

Enjoy!


Tuesday, June 30, 2015

Orleans and the Azure Redis Cache


Recently, I've been diving into Orleans again, the great open-srouce framework that provides an approach for building large-scale, distributed computing applications. It takes care of concurrency and scaling patterns for you and you can focus on the application itself.

I have been blogging about Orleans before, also explaining about Grains and what they can do for you.

At the moment, I am designing and building a large IoT solution and Orleans is used to keep state of Raspberry Pi 2 devices out in the field. By default, grain state is persisted in Azure Tablestorage but I decided to use Redis instead.

To build your own StorageProvider for Orleans we need to build a class that implements IStorageProvider. See the full implementation below.

 public class RedisStorage : IStorageProvider

    {
        private ConnectionMultiplexer connectionMultiplexer;
        private IDatabase redisDatabase;

        private const string REDIS_CONNECTION_STRING = "RedisConnectionString";
        private const string REDIS_DATABASE_NUMBER = "DatabaseNumber";

        private string serviceId;

        /// <summary> Name of this storage provider instance. </summary>
        /// <see cref="IProvider#Name"/>
        public string Name { get; private set; }

        /// <summary> Logger used by this storage provider instance. </summary>
        /// <see cref="IStorageProvider#Log"/>
        public Logger Log { get; private set; }


        /// <summary> Initialization function for this storage provider. </summary>
        /// <see cref="IProvider#Init"/>
        public async Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
        {
            Name = name;
            serviceId = providerRuntime.ServiceId.ToString();

            if (!config.Properties.ContainsKey(REDIS_CONNECTION_STRING) ||
                string.IsNullOrWhiteSpace(config.Properties[REDIS_CONNECTION_STRING]))
            {
                throw new ArgumentException("RedisConnectionString is not set.");
            }
            var connectionString = config.Properties[REDIS_CONNECTION_STRING];

            connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(connectionString);

            if (!config.Properties.ContainsKey(REDIS_DATABASE_NUMBER) ||
                string.IsNullOrWhiteSpace(config.Properties[REDIS_DATABASE_NUMBER]))
            {
                //do not throw an ArgumentException but use the default database
                redisDatabase = connectionMultiplexer.GetDatabase();
            }
            else
            {
                var databaseNumber = Convert.ToInt16(config.Properties[REDIS_DATABASE_NUMBER]);
                redisDatabase = connectionMultiplexer.GetDatabase(databaseNumber);
            }

            Log = providerRuntime.GetLogger("StorageProvider.RedisStorage." + serviceId);
        }

        // Internal method to initialize for testing
        internal void InitLogger(Logger logger)
        {
            Log = logger;
        }

        /// <summary> Shutdown this storage provider. </summary>
        /// <see cref="IStorageProvider#Close"/>
        public Task Close()
        {
            connectionMultiplexer.Dispose();
            return TaskDone.Done;
        }

        /// <summary> Read state data function for this storage provider. </summary>
        /// <see cref="IStorageProvider#ReadStateAsync"/>
        public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var primaryKey = grainReference.ToKeyString();

            if (Log.IsVerbose3)
            {
                Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_ReadingData, "Reading: GrainType={0} Pk={1} Grainid={2} from Database={3}", grainType, primaryKey, grainReference, redisDatabase.Database);
            }

            RedisValue value = await redisDatabase.StringGetAsync(primaryKey);
            var data = new Dictionary<string, object>();
            if (value.HasValue)
            {
                data = JsonConvert.DeserializeObject<Dictionary<string, object>>(value);
            }

            grainState.SetAll(data);

            grainState.Etag = Guid.NewGuid().ToString();
        }

        /// <summary> Write state data function for this storage provider. </summary>
        /// <see cref="IStorageProvider#WriteStateAsync"/>
        public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var primaryKey = grainReference.ToKeyString();
            if (Log.IsVerbose3)
            {
                Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_WritingData, "Writing: GrainType={0} PrimaryKey={1} Grainid={2} ETag={3} to Database={4}", grainType, primaryKey, grainReference, grainState.Etag, redisDatabase.Database);
            }
            var data = grainState.AsDictionary();

            var json = JsonConvert.SerializeObject(data);
            await redisDatabase.StringSetAsync(primaryKey, json);
        }

        /// <summary> Clear state data function for this storage provider. </summary>
        /// <remarks>
        /// </remarks>
        /// <see cref="IStorageProvider#ClearStateAsync"/>
        public Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var primaryKey = grainReference.ToKeyString();
            if (Log.IsVerbose3)
            {
                Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_ClearingData, "Clearing: GrainType={0} Pk={1} Grainid={2} ETag={3} DeleteStateOnClear={4} from Table={5}", grainType, primaryKey, grainReference, grainState.Etag, redisDatabase.Database);
            }
            //remove from cache
            redisDatabase.KeyDelete(primaryKey);
            return TaskDone.Done;
        }
    }


The code is pretty straightforward and you can decorate your grain class with the right attribute.

[StorageProvider(ProviderName = "RedisStorage")]

public class HelloGrain : Orleans.Grain<IHelloState>, HelloWorldInterfaces.IHello


The State of HelloGrain will now be persisted inside Redis instead of Azure TableStorage.

See how my cache gets hit all the time!



I will also upload this to the OrleansContrib page on github at https://github.com/OrleansContrib/Orleans.StorageProvider.Redis. I will keep you posted on the progress on my blog....


Monday, June 15, 2015

Connect your devices to Azure!


How to connect your devices to IoT assets on Azure?
I’ve been working with devices and Azure for quite some time now and have already built alot of (backend) services including Stream Analytics jobs for analyzing and alerting but also on Servicebus Topics and Subscriptions.
I have a solution that is multi-tenant and provides provisioning logic for tenants and devices. One can register and create a tenant and by using the Service Management Libraries I am able to provisioning a new Servicebus Namespace for a specific tenant only. The system returns and ApiKey which must be used in calls to that specific tenant.
After the creation of a tenant, devices can register themselves (or preregister them in a portal) by calling a “registerdevice” WebAPI with the ApiKey in the http header. The multitenant system is than able to register the device and return information to the device that it needs to communicate with the Service Bus assets in that tenant. So what can a device do?
1. Listen for specific commands send to the device or a group of devices. For this purpose, the provisioning system creates a topic (commands) and for every devices that registers a designated subscription.
2. Send events to a topic. For this purpose, the provisioning system enables a registered devices to send (and only send) messages to a topic called ‘events’.
3. Send telemetry data to a Event Hub. The device can send high-speed information like GPS or temperature measurements to the EventHub where other systems can process, evaluate, analyze and report on it.
Every device follows this pattern which makes it easy to extend the system with 10’s, 100’s or 1,000’s of devices.
Remember that a topic can only have up to 2,000 subscriptions that’s why I use autoforwarding and fanout. The single entry point in the “commanding” system is a topic which has multiple subscriptions depending on the sizing of the system (meaning, the # of devices). E.g. a system containing 10,000 devices needs 5 additional Service Bus topics each containing 2,000 subscriptions. Sending one command to the “master topic” results in the autoforwarding to the 5 additional topics thus realizing the system to send 10,000 single commands to each and every device if needed.
Now we have the provisioning logic in place, we need to connect devices!
I’ve built device agents that can run on devices or systems and currently support .NET and Python (and will add more in the future). The .NET device agent can run on any device that can support .NET obviously. The python agent can be run e.g. on a Rasperry Pi (2).
The provisioning system generates SharedAccessSignatures for:
– the subscription of the device so that only the device has listen access
– the ‘events’ topic so that the device has send rights to post back incidental events like heartbeats or status information
– the event hub of the tenant to post telemetry data.
In the backend provisioning system, this happens:
string listenSaS = SharedAccessSignatureTokenProvider.GetSharedAccessSignature(
“CommandTopicListenRule”, ListenSAS, subUri.ToString().Trim(‘/’), TimeSpan.FromDays(365 * 10));
The listenSas contains a SAS that is generated based on  the CommandTopicListenRule shared access policy as configured in the Azure portal (see below). The SAS is valid for 10 years. The provisioning system contains functions to freeze a device e.g. when it’s been compromised.
sharedaccesspolicy
The current version of the Azure SDK for python supports only ACS (which is oldschool) and a combination of shared_access_key and shared_access_key_value. Since my system generated SAS tokens only I had to change the python servicebus sdk to support SAS only as well.
The change I made is:
At line 129 add:
elif shared_access_key_value:
self.authentication = ServiceBusSASAuthentication(key_value=shared_access_key_value)
and in the signing of a request replace the function with:
def sign_request(self, request, httpclient):
if self.key_value is not None and self.key_name is None:
request.headers.append((‘Authorization’, self.key_value))
else:
request.headers.append((‘Authorization’, self._get_authorization(request, httpclient)))
This piece of python is at line 1190. Save the servicebusservice.py and now you also able to use SAS only!
The python device agent I created is simple to use:
self.agent = Agent(, url=)
#register device with it’s Mac Address.
self.agent.register_device(getnode())
#respond to incoming commands, just print for now
self.agent.on_messagereceived = self.showmessage
#start  the python device agent!
self.agent.start()
#send message to event hub of this tenant, example.
msg = Message(‘Agent started on {0}’.format(self.agent.deviceId).encode(‘utf-8′), custom_properties={})
self.agent.send_telemetry(msg)
#send an event to the events topic of the system
msg2 = Message(‘Agent started on {0}’.format(self.agent.deviceId).encode(‘utf-8′), custom_properties={})
self.agent.send_event(msg2)
I use Service Bus explorer to test the agent and to send messages to this specific device. I can send to a specific device by adding a message property containing the device id. See below.
devicemessage
Now I click on start and the message is sent to  the commands topic in tenant specific environment. The autoforwarding system enables the fact that the message is send to commands-0 (first partition) and commands-1 (second partition). Since every device has its own subscription with a SqlFilter on it containing the deviceId, this message only flows to the subscription of the device I added to the message property in Service Bus explorer.
sbexplorer
The output of the python device agent is:
pythonoutput
As you can see the agent is started, it sent one event to the ‘events’ topic and sending the message from ServiceBus explorer resulted in printing the message to the screen containing the message payload.
This piece of python can easily be copied to a raspberry pi and enable the Pi to join the IoT system!
For questions or feedback, don’t hesitate!

Tuesday, August 5, 2014

Internet of Things and Event Hub

It's been a while but here is a new blog post on the recently released Event Hub of Microsoft and how to use it in an Internet of Things scenario.

For more information on Event Hub go to the Microsoft site http://azure.microsoft.com/en-us/services/event-hubs.

Consider Event Hub as a vehicle where you can send millions of events per second to offload your devices in your IoT scenario. I had issues before with Service Bus topics related to maximum number of authorizations per second. Event Hub is built on top of Azure Table Storage enabling it to scale almost unlimited. See the figure below to see where Event Hub fits in your architecture (from the Microsoft website).

Event Hubs Architecture


To get things started you need to create an Event Hub in your Service Bus namespace. In this example I created an event hub called "eventhub" in my internetofthings service bus namespace. You can also create event hubs from code.As you can see I have a partition count of 8. This means I need 8 instances/threads/processes to manage every single partition for maximum throughput.



The following code shows how to create senders of events and how to create a receiver.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Threading;
using Newtonsoft.Json;


namespace EventHubTutorial
{
    public  class SampleEvent
    {
        public string Payload { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();

            //create 8 threads, for every partition one.
            for (int i=0;i<8 i="" state="" task.factory.startnew="">
                {
                    Console.WriteLine("Thread for partition: {0}", state);

                    var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "internetofthings", ""), new MessagingFactorySettings()
                    {
                        TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Listener", "AsrrSbT/QIxUc73Bvg0t4GqP9O988xHrQDNSCTAjHXc="),
                        TransportType = TransportType.Amqp
                    });

                    var receiver = factory.CreateEventHubClient("eventhub")
                        .GetDefaultConsumerGroup()
                        .CreateReceiver(state.ToString(), DateTime.UtcNow);


                    while (true)
                    {
                        if (cts.IsCancellationRequested)
                        {
                            receiver.Close();
                            break;
                        }

                        // Receive could fail, I would need a retry policy etc...
                        var message = receiver.Receive(TimeSpan.FromMinutes(5));
                        if (message != null)
                        {
                            var logMessage = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(message.GetBytes()));

                            Console.WriteLine("{0}: {1}", DateTime.Now, logMessage.Payload);
                            Console.WriteLine("Event handled from partition {0} by thread {1}. Sequence:{2}", 
                                message.PartitionKey, state, message.SequenceNumber);
                        }
                    }
                }, i);
            }

            //as long as I don't quit...send number of events I enter.
            string input = String.Empty;
            int numberOfEvents = 0;
            //create sender.
            var factorySender = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "internetofthings", ""), new MessagingFactorySettings()
            {
                TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Sender", "QYUfWxvoQWJgSeov8qIW3aeQeRgnq+MJ7rCl+CyAclE="),
                TransportType = TransportType.Amqp
            });
            EventHubClient _client = factorySender.CreateEventHubClient("eventhub");
            Random rnd = new Random();
            
            do
            {
                input = Console.ReadLine();
                if (int.TryParse(input, out numberOfEvents))
                {

                    //generate events.
                    Console.WriteLine("Number of events generated:{0}", numberOfEvents);
                    for (int y=0;y<=numberOfEvents; y++)
                    { 
                        var eventData = new EventData(Encoding.Default.GetBytes(JsonConvert.SerializeObject(new SampleEvent()
                        {
                            Payload = "randomdata:" + DateTime.Now.Ticks.ToString()
                        })));

                        //somewhere between 0 and 7
                        eventData.PartitionKey = rnd.Next(0, 7).ToString();
                        _client.Send(eventData);
                    }
                }
            } 
            while (input.CompareTo("quit") != 0);
        }
    }
}



These 100 lines of code demonstrate how to create 8 threads. Every single thread receives message from a partition. Notice that you need to do partitioning yourself. If you send all your messages to the same partition the amount of messages is limited to what a single partition can handle (

Between line 79 and 98 a do while loop is created and sends an amount of messages that you enter in the console app. If you enter "quit" the app exits.

I think the code is pretty straightforward :-)

Remember that the next step would be to route the received message to a more permanent store like Service Bus or Orleans for further processing....the Event Hub

Saturday, May 24, 2014

Monitorable Grains

In the Orleans world, Grain are somewhat 'lost' by default. You can activate grains and the runtime takes care of its lifecycle and when it's deactivated. For my scenario, I need some sort of Grain that is being supervised by a Supervisor (or UniverseGrain). The Supervisor can for instance:

- keep track of the amount of activated grains
- keep the history of heartbeats
- notify others on the heartbeat
- more housekeeping jobs

By default, grains don't contain this kind of functionality. I created a new type of grain as an abstract class called MonitorableGrain, indicating that it can be monitored somehow. What does the MonitorableGrain does by default?

- on activation, register itself at the SupervisorGrain
- by using a timer, send out a heartbeat every X minutes to the supervisor to indicate it is still alive
- on deactivation, unregister itself at the supervisor.

All my future grains will derive from the abstract MonitorableGrain and inherited this behaviour by default. The IMonitorableGrain interface contains no methods yet. See below for the code.

public class MonitorableGrain : Orleans.GrainBase, IMonitorableGrain, Orleans.IGrain
    {
        IOrleansTimer heartBeatTimer = null;
        ISupervisorGrain superVisorGrain = null;

        public override Task ActivateAsync()
        {
            Trace.TraceInformation("Activation!");
            //register the grain at the superVisor
            var superVisorGrain = SupervisorGrainFactory.GetGrain(0);
            //create a timer that executes a heartbeat. Timer instead of Reminder since we are only interested in activated grains
            RegisterTimer(HeartBeat, this, TimeSpan.FromMinutes(0), TimeSpan.FromSeconds(15));

            superVisorGrain.RegisterGrain(this);
            return base.ActivateAsync();
        }


        private Task HeartBeat(object grain)
        {
            Trace.WriteLine(String.Format("Heartbeat of {0} at {1}", this.GetPrimaryKey().ToString(), DateTime.Now.ToLongTimeString()));
            superVisorGrain.HeartBeat(grain as IMonitorableGrain, "I am still alive");
            return TaskDone.Done;
        }

        public override Task DeactivateAsync()
        {
            Trace.WriteLine("Deactivation!");
            superVisorGrain.UnregisterGrain(this);
            heartBeatTimer.Dispose();
            return base.DeactivateAsync();
        }
    }

In the snippets below you will see the ISupervisorGrain interface and the implementation.

public interface ISupervisorGrain : Orleans.IGrain
{
   Task<int> NumberOfGrains();
   Task RegisterGrain(IMonitorableGrain monitorableGrain);
   Task UnregisterGrain(IMonitorableGrain monitorableGrain);
   Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage);
}

public class HeartBeat 
    {
        private Guid grainId;

        public Guid GrainId
        {
          get { return grainId; }
          set { grainId = value; }
        }

        private DateTime heartBeatTime;

        public DateTime HeartBeatTime
        {
          get { return heartBeatTime; }
          set { heartBeatTime = value; }
        }

        private string heartBeatMessage;

        public string HeartBeatMessage
        {
          get { return heartBeatMessage; }
          set { heartBeatMessage = value; }
        }

        public HeartBeat(Guid grainId, DateTime heartBeatTime, string heartBeatMessage)
        {
            this.grainId = grainId;
            this.heartBeatTime = heartBeatTime;
            this.heartBeatMessage = heartBeatMessage;
        }
    }

public class SupervisorGrain : Orleans.GrainBase, ISupervisorGrain
    {
        //list of grains
        Dictionary grains = new Dictionary();
        //history of heartbeats
        List heartBeats = new List();

        public override Task ActivateAsync()
        {
            Trace.TraceInformation("Activation");
            return base.ActivateAsync();
        }

        public Task<int> NumberOfGrains()
        {
            Trace.TraceInformation("NumberOfGrains");
            return Task.FromResult<int>(grains.Count);
        }


        public Task RegisterGrain(IMonitorableGrain monitorableGrain)
        {
            Trace.TraceInformation("RegisterGrain");
            grains.Add(monitorableGrain.GetPrimaryKey(), monitorableGrain);
            return TaskDone.Done;
        }

        public Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage)
        {
            Trace.TraceInformation("HeartBeat");
            heartBeats.Add(new HeartBeat(grain.GetPrimaryKey(), DateTime.Now, heartBeatMessage));
            return TaskDone.Done;
                
        }

        public Task UnregisterGrain(IMonitorableGrain monitorableGrain)
        {
            Trace.TraceInformation("UnregisterGrain");
            grains.Remove(monitorableGrain.GetPrimaryKey());
            return TaskDone.Done;
        }
    }

public class SupervisorGrain : Orleans.GrainBase, ISupervisorGrain
        {
            //list of grains
            Dictionary grains = new Dictionary();
            //history of heartbeats
            List heartBeats = new List();

            public override Task ActivateAsync()
            {
                Trace.TraceInformation("Activation");
                return base.ActivateAsync();
            }

            public Task<int> NumberOfGrains()
            {
                Trace.TraceInformation("NumberOfGrains");
                return Task.FromResult<int>(grains.Count);
            }


            public Task RegisterGrain(IMonitorableGrain monitorableGrain)
            {
                Trace.TraceInformation("RegisterGrain");
                grains.Add(monitorableGrain.GetPrimaryKey(), monitorableGrain);
                return TaskDone.Done;
            }

            public Task HeartBeat(IMonitorableGrain grain, string heartBeatMessage)
            {
                Trace.TraceInformation("HeartBeat");
                heartBeats.Add(new HeartBeat(grain.GetPrimaryKey(), DateTime.Now, heartBeatMessage));
                return TaskDone.Done;
                
            }

            public Task UnregisterGrain(IMonitorableGrain monitorableGrain)
            {
                Trace.TraceInformation("UnregisterGrain");
                grains.Remove(monitorableGrain.GetPrimaryKey());
                return TaskDone.Done;
            }

       
        }





Will continue to improve the MonitorableGrain and SupervisorGrain but this is a good start.






Thursday, May 15, 2014

Grains and Reminders

The previous blog posts were about Orleans and Grains and how to build up a nice universe of grains. This post will demonstrate how to use reminders.

Reminders are almost equal to timer but there are a few differences:
- reminders will always trigger even when a grain is not implicitly activated.
- a grain gets reactivated when the reminder is fired.
- reminders should not be used for intensive timing purposed (don't fire one every second).

Reminders are a great fit for 'housekeeping' purposes. In my scenario, where devices are represented virtually by a grain and keeping state and history of the device, reminders will be used to periodically check if the device out there is still provisioned and part of the IoT game. Consider when I remove service identities on ACS. After removal devices are not able to send their temperatures to the event topic anymore and are not able to receive any commands in their subscription anymore. This probably means that the device is turned off, removed, stolen or any other reasons to remove the device from the universe. Using a reminder in this scenario can be helpful!

The reminder will perform the following tasks:
- fire every 10 minutes
- check on ACS if the associated service identity is still there
- if the service identity is missing, the reminder will set the device grain in a "inactive" state and remove it from the universe grain devices list and move it to the universe grain's "inactive devices"
- the reminder will also remove any other ACS related adminstration (rule groups, relying party)

Implementing this completely rules out the Device grain and puts the device in a blocked or inactive state (removing a grain is not possible in Orleans since you can activate or deactivate a grain). The block/inactive device grain will appear on the client as a red pushpin indicating it's deactivated state. Obviously, there are no updates send by the real device anymore since it's removed/stolen are turned off.

Reminders are persisted (they need to be since they also get trigger on deactivated grains!).

To make sure your reminders are persistent you need to add the following line to your OrleansConfiguration.xml settings.

ReminderServiceType="AzureTable"/>

To enable reminders on your grain you need to implement the IRemindable interface.


public class DeviceGrain : Orleans.GrainBase, IDeviceGrain, IRemindable

Implementation of the reminder which is started right away and gets fired every 10 minutes. In the RegisterDevice() method of the Device Grain we also register the reminder.

private IOrleansReminder reminder;

async Task IDeviceGrain.RegisterDevice()
{
   reminder = await RegisterOrUpdateReminder("Housekeeping", TimeSpan.FromMinutes(0), TimeSpan.FromMinutes(10));

   var universe = UniverseGrainFactory.GetGrain(0);
   await universe.RegisterDevice(this);
   }
}

public Task ReceiveReminder(string reminderName, TickStatus status)
{
   Trace.WriteLine("Housekeeping reminder is triggered");
   return TaskDone.Done;
}

Explicit deregistration of the device grain by user or system action deregisters the reminder we created before. This prevents the continuation of reminder being triggered.


public async Task UnregisterDevice()
{
   //make sure the reminder goes away since they are persistent! Explicit remove the reminder
   await UnregisterReminder(reminder);
}

Every time the reminder is triggered, I check my ACS assets to see if the device (it's service identity) was removed. Using reminders are a great way to do housekeeping task in the Internet of Things world and keep both your grain universe and related LOB systems (in my case ACS) as clean as possible and maintain Orleans in state where it is an exact representation of your real IT assets.

Happy coding!





Thursday, May 8, 2014

Grains, Grains and more Grains

The last two blog posts demonstrated how to use Orleans in the context of Internet of Things. This post outlines how  to setup your universe of Grains (in my scenario a UniverseGrain, a SignalRGrain and multiple DeviceGrain representing a real device somewhere out there). The picture belows depicts this ecosystem of Grains.




Here are the interfaces of my three types of Grains. The Orleans SDK generates code during compile-time and creates factories for every single interface that has the Orleans.IGrain marker interface.

The UniverseGrain

public interface IUniverseGrain : Orleans.IGrain
    {
        Task<int> NumberOfDevices();
        Task<double> AverageTemperature();
        Task RegisterDevice(IDeviceGrain grain);
        Task > GetDevices();
    }

1. NumberOfDevices() : returns the total number of devices that are registered to the universe
2. AverageTemperature(): takes the average temperature of every single device, sums it up and averages again.
3. RegisterDevice(): a DeviceGrain registers itself at the universe
4. GetDevices() : returns a list of registered DeviceGrains. This comes in handy to get a 'snapshot' of the current device universe. This method is called by the SignalR hub in the OnConnected override. Every time a new browser clients connects, the hub call this method on the UniverseGrain to get all the devices and sends this information to the browser where pushpins are drawn and temperatures are shown.

Once a device registers itself at the UniverseGrain, the UniverseGrain also calls the PushNotifierGrain to tell the world of this registration. Showing a toast on a webpage for example.

public async Task RegisterDevice(IDeviceGrain grain)
        {
            //also register at SignalR grain
            grains.Add(grain);
            var notifier = PushNotifierGrainFactory.GetGrain(0);
            await notifier.RegisterDevice(grain);
        }

The DeviceGrain

The DeviceGrain implements several methods, some of them are obvious but notice the attribute that decorates the interfaces.

[ExtendedPrimaryKey]
    public interface IDeviceGrain : Orleans.IGrain
    {
        Task SetLatitude(string latitude);
        Task SetLongitude(string longitude);

        Task<string> GetLatitude();
        Task<string> GetLongitude();
        Task<string> GetDeviceID();
        Task<double> GetAverageTemperature();
        Task<double> GetTemperature();

        Task SetTemperature(double temperature);
        Task RegisterDevice();
    }

By setting this attribute you can use your own propriarty "primary key" mechanism.

var deviceID = Guid.NewGuid().ToString("N");
var device = DeviceGrainFactory.GetGrain(0, deviceID);

The DeviceGrain holds this deviceID primary key by overriding the ActivateAsync method. Later on, this deviceID is also persisted in "State".

public override async Task ActivateAsync()
        {
            //store the deviceID
            this.GetPrimaryKey(out deviceID);
            this.State.DeviceID = deviceID;
          await base.ActivateAsync();
        }

Registration of a device is simple. Just call RegisterDevice on the UniverseGrain and the device is added to a list in the UniverseGrain.

async Task IDeviceGrain.RegisterDevice()
{
            var universe = UniverseGrainFactory.GetGrain(0);
            await universe.RegisterDevice(this);
}

The PushNotifierGrain(SignalR) grain is decorated with the StatelessWorker attribute. This means that Orleans can create multiple activations of this Grain for throughput reasons. This can only be done for grains with no state or only immutable state. Since the PushNotifierGrain only holds a "IHubProxy" that is immutable  this can be done.


    [StatelessWorker]
    public interface IPushNotifierGrain : Orleans.IGrain
    {
        Task SendMessage(string deviceid, string temperature);
        Task SendGeneralMessage(string message);
        Task RegisterDevice(IDeviceGrain device);
        Task UpdateTemperature(IDeviceGrain device);
        Task SendMessage(string message);
    }

When the temperature is set on a device grain. For example, when a true device somewhere in the world reports back his temperature on a Service Bus Topic and one of the Subscriptions is maintained by a workerrole. A role that maintains the Orleans Universe) in my scenario. Very scalable since I can scale up this workerrole to infinity to handle more and more report backs from devices.

So, the temperature is set and then the PushNotifierGrain kicks in. The DeviceGrain sets its own current temperature and add the temperature to its history of temperatures which is held in its state (to make it able to persist). Finally, it tells the notifiergrain to send this updated temperature to everybody that is involved (in this case, browsers). In my scenario, this results in a change of the text of a pushpin on Bing Maps. The text displays the temperature.

var notifier = PushNotifierGrainFactory.GetGrain(0);
this.State.Temperature = temperature;
temperatures.Add(temperature);
await notifier.UpdateTemperature(this);


Persistency
I want my universe of Grains to be persistent. If my silo goes down for any reason (locally running a Silo in a Workerrole means that it goes down every time I stop debugging...) the state of grains needs to be persisted. What data needs to be persistet then? First of all, the current temperature of each and every devicegrain but also the history of temperatures of that particular devices. The UniverseGrain also needs some persistence because the next time the silo comes up I want the deviceGrains to be there and populated but also the UniverseGrain needs to get it's list of DeviceGrains again. So how can I assure this persistency? I will also explain this in some of the next posts.

Observable?
Are Grains Observable? Yes they are but more on that topic in the next post! Enjoy Orleans :-) Thanks to http://hilite.me/ to nicely format C# code.

And thanks @Sergey Bykov from the Orleans team for providing some nice insights!