Tuesday, June 30, 2015

Orleans and the Azure Redis Cache


Recently, I've been diving into Orleans again, the great open-srouce framework that provides an approach for building large-scale, distributed computing applications. It takes care of concurrency and scaling patterns for you and you can focus on the application itself.

I have been blogging about Orleans before, also explaining about Grains and what they can do for you.

At the moment, I am designing and building a large IoT solution and Orleans is used to keep state of Raspberry Pi 2 devices out in the field. By default, grain state is persisted in Azure Tablestorage but I decided to use Redis instead.

To build your own StorageProvider for Orleans we need to build a class that implements IStorageProvider. See the full implementation below.

 public class RedisStorage : IStorageProvider

    {
        private ConnectionMultiplexer connectionMultiplexer;
        private IDatabase redisDatabase;

        private const string REDIS_CONNECTION_STRING = "RedisConnectionString";
        private const string REDIS_DATABASE_NUMBER = "DatabaseNumber";

        private string serviceId;

        /// <summary> Name of this storage provider instance. </summary>
        /// <see cref="IProvider#Name"/>
        public string Name { get; private set; }

        /// <summary> Logger used by this storage provider instance. </summary>
        /// <see cref="IStorageProvider#Log"/>
        public Logger Log { get; private set; }


        /// <summary> Initialization function for this storage provider. </summary>
        /// <see cref="IProvider#Init"/>
        public async Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
        {
            Name = name;
            serviceId = providerRuntime.ServiceId.ToString();

            if (!config.Properties.ContainsKey(REDIS_CONNECTION_STRING) ||
                string.IsNullOrWhiteSpace(config.Properties[REDIS_CONNECTION_STRING]))
            {
                throw new ArgumentException("RedisConnectionString is not set.");
            }
            var connectionString = config.Properties[REDIS_CONNECTION_STRING];

            connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(connectionString);

            if (!config.Properties.ContainsKey(REDIS_DATABASE_NUMBER) ||
                string.IsNullOrWhiteSpace(config.Properties[REDIS_DATABASE_NUMBER]))
            {
                //do not throw an ArgumentException but use the default database
                redisDatabase = connectionMultiplexer.GetDatabase();
            }
            else
            {
                var databaseNumber = Convert.ToInt16(config.Properties[REDIS_DATABASE_NUMBER]);
                redisDatabase = connectionMultiplexer.GetDatabase(databaseNumber);
            }

            Log = providerRuntime.GetLogger("StorageProvider.RedisStorage." + serviceId);
        }

        // Internal method to initialize for testing
        internal void InitLogger(Logger logger)
        {
            Log = logger;
        }

        /// <summary> Shutdown this storage provider. </summary>
        /// <see cref="IStorageProvider#Close"/>
        public Task Close()
        {
            connectionMultiplexer.Dispose();
            return TaskDone.Done;
        }

        /// <summary> Read state data function for this storage provider. </summary>
        /// <see cref="IStorageProvider#ReadStateAsync"/>
        public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var primaryKey = grainReference.ToKeyString();

            if (Log.IsVerbose3)
            {
                Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_ReadingData, "Reading: GrainType={0} Pk={1} Grainid={2} from Database={3}", grainType, primaryKey, grainReference, redisDatabase.Database);
            }

            RedisValue value = await redisDatabase.StringGetAsync(primaryKey);
            var data = new Dictionary<string, object>();
            if (value.HasValue)
            {
                data = JsonConvert.DeserializeObject<Dictionary<string, object>>(value);
            }

            grainState.SetAll(data);

            grainState.Etag = Guid.NewGuid().ToString();
        }

        /// <summary> Write state data function for this storage provider. </summary>
        /// <see cref="IStorageProvider#WriteStateAsync"/>
        public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var primaryKey = grainReference.ToKeyString();
            if (Log.IsVerbose3)
            {
                Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_WritingData, "Writing: GrainType={0} PrimaryKey={1} Grainid={2} ETag={3} to Database={4}", grainType, primaryKey, grainReference, grainState.Etag, redisDatabase.Database);
            }
            var data = grainState.AsDictionary();

            var json = JsonConvert.SerializeObject(data);
            await redisDatabase.StringSetAsync(primaryKey, json);
        }

        /// <summary> Clear state data function for this storage provider. </summary>
        /// <remarks>
        /// </remarks>
        /// <see cref="IStorageProvider#ClearStateAsync"/>
        public Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var primaryKey = grainReference.ToKeyString();
            if (Log.IsVerbose3)
            {
                Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_ClearingData, "Clearing: GrainType={0} Pk={1} Grainid={2} ETag={3} DeleteStateOnClear={4} from Table={5}", grainType, primaryKey, grainReference, grainState.Etag, redisDatabase.Database);
            }
            //remove from cache
            redisDatabase.KeyDelete(primaryKey);
            return TaskDone.Done;
        }
    }


The code is pretty straightforward and you can decorate your grain class with the right attribute.

[StorageProvider(ProviderName = "RedisStorage")]

public class HelloGrain : Orleans.Grain<IHelloState>, HelloWorldInterfaces.IHello


The State of HelloGrain will now be persisted inside Redis instead of Azure TableStorage.

See how my cache gets hit all the time!



I will also upload this to the OrleansContrib page on github at https://github.com/OrleansContrib/Orleans.StorageProvider.Redis. I will keep you posted on the progress on my blog....


Monday, June 15, 2015

Connect your devices to Azure!


How to connect your devices to IoT assets on Azure?
I’ve been working with devices and Azure for quite some time now and have already built alot of (backend) services including Stream Analytics jobs for analyzing and alerting but also on Servicebus Topics and Subscriptions.
I have a solution that is multi-tenant and provides provisioning logic for tenants and devices. One can register and create a tenant and by using the Service Management Libraries I am able to provisioning a new Servicebus Namespace for a specific tenant only. The system returns and ApiKey which must be used in calls to that specific tenant.
After the creation of a tenant, devices can register themselves (or preregister them in a portal) by calling a “registerdevice” WebAPI with the ApiKey in the http header. The multitenant system is than able to register the device and return information to the device that it needs to communicate with the Service Bus assets in that tenant. So what can a device do?
1. Listen for specific commands send to the device or a group of devices. For this purpose, the provisioning system creates a topic (commands) and for every devices that registers a designated subscription.
2. Send events to a topic. For this purpose, the provisioning system enables a registered devices to send (and only send) messages to a topic called ‘events’.
3. Send telemetry data to a Event Hub. The device can send high-speed information like GPS or temperature measurements to the EventHub where other systems can process, evaluate, analyze and report on it.
Every device follows this pattern which makes it easy to extend the system with 10’s, 100’s or 1,000’s of devices.
Remember that a topic can only have up to 2,000 subscriptions that’s why I use autoforwarding and fanout. The single entry point in the “commanding” system is a topic which has multiple subscriptions depending on the sizing of the system (meaning, the # of devices). E.g. a system containing 10,000 devices needs 5 additional Service Bus topics each containing 2,000 subscriptions. Sending one command to the “master topic” results in the autoforwarding to the 5 additional topics thus realizing the system to send 10,000 single commands to each and every device if needed.
Now we have the provisioning logic in place, we need to connect devices!
I’ve built device agents that can run on devices or systems and currently support .NET and Python (and will add more in the future). The .NET device agent can run on any device that can support .NET obviously. The python agent can be run e.g. on a Rasperry Pi (2).
The provisioning system generates SharedAccessSignatures for:
– the subscription of the device so that only the device has listen access
– the ‘events’ topic so that the device has send rights to post back incidental events like heartbeats or status information
– the event hub of the tenant to post telemetry data.
In the backend provisioning system, this happens:
string listenSaS = SharedAccessSignatureTokenProvider.GetSharedAccessSignature(
“CommandTopicListenRule”, ListenSAS, subUri.ToString().Trim(‘/’), TimeSpan.FromDays(365 * 10));
The listenSas contains a SAS that is generated based on  the CommandTopicListenRule shared access policy as configured in the Azure portal (see below). The SAS is valid for 10 years. The provisioning system contains functions to freeze a device e.g. when it’s been compromised.
sharedaccesspolicy
The current version of the Azure SDK for python supports only ACS (which is oldschool) and a combination of shared_access_key and shared_access_key_value. Since my system generated SAS tokens only I had to change the python servicebus sdk to support SAS only as well.
The change I made is:
At line 129 add:
elif shared_access_key_value:
self.authentication = ServiceBusSASAuthentication(key_value=shared_access_key_value)
and in the signing of a request replace the function with:
def sign_request(self, request, httpclient):
if self.key_value is not None and self.key_name is None:
request.headers.append((‘Authorization’, self.key_value))
else:
request.headers.append((‘Authorization’, self._get_authorization(request, httpclient)))
This piece of python is at line 1190. Save the servicebusservice.py and now you also able to use SAS only!
The python device agent I created is simple to use:
self.agent = Agent(, url=)
#register device with it’s Mac Address.
self.agent.register_device(getnode())
#respond to incoming commands, just print for now
self.agent.on_messagereceived = self.showmessage
#start  the python device agent!
self.agent.start()
#send message to event hub of this tenant, example.
msg = Message(‘Agent started on {0}’.format(self.agent.deviceId).encode(‘utf-8′), custom_properties={})
self.agent.send_telemetry(msg)
#send an event to the events topic of the system
msg2 = Message(‘Agent started on {0}’.format(self.agent.deviceId).encode(‘utf-8′), custom_properties={})
self.agent.send_event(msg2)
I use Service Bus explorer to test the agent and to send messages to this specific device. I can send to a specific device by adding a message property containing the device id. See below.
devicemessage
Now I click on start and the message is sent to  the commands topic in tenant specific environment. The autoforwarding system enables the fact that the message is send to commands-0 (first partition) and commands-1 (second partition). Since every device has its own subscription with a SqlFilter on it containing the deviceId, this message only flows to the subscription of the device I added to the message property in Service Bus explorer.
sbexplorer
The output of the python device agent is:
pythonoutput
As you can see the agent is started, it sent one event to the ‘events’ topic and sending the message from ServiceBus explorer resulted in printing the message to the screen containing the message payload.
This piece of python can easily be copied to a raspberry pi and enable the Pi to join the IoT system!
For questions or feedback, don’t hesitate!