Tuesday, August 5, 2014

Internet of Things and Event Hub

It's been a while but here is a new blog post on the recently released Event Hub of Microsoft and how to use it in an Internet of Things scenario.

For more information on Event Hub go to the Microsoft site http://azure.microsoft.com/en-us/services/event-hubs.

Consider Event Hub as a vehicle where you can send millions of events per second to offload your devices in your IoT scenario. I had issues before with Service Bus topics related to maximum number of authorizations per second. Event Hub is built on top of Azure Table Storage enabling it to scale almost unlimited. See the figure below to see where Event Hub fits in your architecture (from the Microsoft website).

Event Hubs Architecture


To get things started you need to create an Event Hub in your Service Bus namespace. In this example I created an event hub called "eventhub" in my internetofthings service bus namespace. You can also create event hubs from code.As you can see I have a partition count of 8. This means I need 8 instances/threads/processes to manage every single partition for maximum throughput.



The following code shows how to create senders of events and how to create a receiver.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Threading;
using Newtonsoft.Json;


namespace EventHubTutorial
{
    public  class SampleEvent
    {
        public string Payload { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();

            //create 8 threads, for every partition one.
            for (int i=0;i<8 i="" state="" task.factory.startnew="">
                {
                    Console.WriteLine("Thread for partition: {0}", state);

                    var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "internetofthings", ""), new MessagingFactorySettings()
                    {
                        TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Listener", "AsrrSbT/QIxUc73Bvg0t4GqP9O988xHrQDNSCTAjHXc="),
                        TransportType = TransportType.Amqp
                    });

                    var receiver = factory.CreateEventHubClient("eventhub")
                        .GetDefaultConsumerGroup()
                        .CreateReceiver(state.ToString(), DateTime.UtcNow);


                    while (true)
                    {
                        if (cts.IsCancellationRequested)
                        {
                            receiver.Close();
                            break;
                        }

                        // Receive could fail, I would need a retry policy etc...
                        var message = receiver.Receive(TimeSpan.FromMinutes(5));
                        if (message != null)
                        {
                            var logMessage = Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.Default.GetString(message.GetBytes()));

                            Console.WriteLine("{0}: {1}", DateTime.Now, logMessage.Payload);
                            Console.WriteLine("Event handled from partition {0} by thread {1}. Sequence:{2}", 
                                message.PartitionKey, state, message.SequenceNumber);
                        }
                    }
                }, i);
            }

            //as long as I don't quit...send number of events I enter.
            string input = String.Empty;
            int numberOfEvents = 0;
            //create sender.
            var factorySender = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "internetofthings", ""), new MessagingFactorySettings()
            {
                TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("Sender", "QYUfWxvoQWJgSeov8qIW3aeQeRgnq+MJ7rCl+CyAclE="),
                TransportType = TransportType.Amqp
            });
            EventHubClient _client = factorySender.CreateEventHubClient("eventhub");
            Random rnd = new Random();
            
            do
            {
                input = Console.ReadLine();
                if (int.TryParse(input, out numberOfEvents))
                {

                    //generate events.
                    Console.WriteLine("Number of events generated:{0}", numberOfEvents);
                    for (int y=0;y<=numberOfEvents; y++)
                    { 
                        var eventData = new EventData(Encoding.Default.GetBytes(JsonConvert.SerializeObject(new SampleEvent()
                        {
                            Payload = "randomdata:" + DateTime.Now.Ticks.ToString()
                        })));

                        //somewhere between 0 and 7
                        eventData.PartitionKey = rnd.Next(0, 7).ToString();
                        _client.Send(eventData);
                    }
                }
            } 
            while (input.CompareTo("quit") != 0);
        }
    }
}



These 100 lines of code demonstrate how to create 8 threads. Every single thread receives message from a partition. Notice that you need to do partitioning yourself. If you send all your messages to the same partition the amount of messages is limited to what a single partition can handle (

Between line 79 and 98 a do while loop is created and sends an amount of messages that you enter in the console app. If you enter "quit" the app exits.

I think the code is pretty straightforward :-)

Remember that the next step would be to route the received message to a more permanent store like Service Bus or Orleans for further processing....the Event Hub