Recently, I've been diving into Orleans again, the great open-srouce framework that provides an approach for building large-scale, distributed computing applications. It takes care of concurrency and scaling patterns for you and you can focus on the application itself.
I have been blogging about Orleans before, also explaining about Grains and what they can do for you.
At the moment, I am designing and building a large IoT solution and Orleans is used to keep state of Raspberry Pi 2 devices out in the field. By default, grain state is persisted in Azure Tablestorage but I decided to use Redis instead.
To build your own StorageProvider for Orleans we need to build a class that implements IStorageProvider. See the full implementation below.
public class RedisStorage : IStorageProvider
{
private ConnectionMultiplexer connectionMultiplexer;
private IDatabase redisDatabase;
private const string REDIS_CONNECTION_STRING = "RedisConnectionString";
private const string REDIS_DATABASE_NUMBER = "DatabaseNumber";
private string serviceId;
/// <summary> Name of this storage provider instance. </summary>
/// <see cref="IProvider#Name"/>
public string Name { get; private set; }
/// <summary> Logger used by this storage provider instance. </summary>
/// <see cref="IStorageProvider#Log"/>
public Logger Log { get; private set; }
/// <summary> Initialization function for this storage provider. </summary>
/// <see cref="IProvider#Init"/>
public async Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
{
Name = name;
serviceId = providerRuntime.ServiceId.ToString();
if (!config.Properties.ContainsKey(REDIS_CONNECTION_STRING) ||
string.IsNullOrWhiteSpace(config.Properties[REDIS_CONNECTION_STRING]))
{
throw new ArgumentException("RedisConnectionString is not set.");
}
var connectionString = config.Properties[REDIS_CONNECTION_STRING];
connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(connectionString);
if (!config.Properties.ContainsKey(REDIS_DATABASE_NUMBER) ||
string.IsNullOrWhiteSpace(config.Properties[REDIS_DATABASE_NUMBER]))
{
//do not throw an ArgumentException but use the default database
redisDatabase = connectionMultiplexer.GetDatabase();
}
else
{
var databaseNumber = Convert.ToInt16(config.Properties[REDIS_DATABASE_NUMBER]);
redisDatabase = connectionMultiplexer.GetDatabase(databaseNumber);
}
Log = providerRuntime.GetLogger("StorageProvider.RedisStorage." + serviceId);
}
// Internal method to initialize for testing
internal void InitLogger(Logger logger)
{
Log = logger;
}
/// <summary> Shutdown this storage provider. </summary>
/// <see cref="IStorageProvider#Close"/>
public Task Close()
{
connectionMultiplexer.Dispose();
return TaskDone.Done;
}
/// <summary> Read state data function for this storage provider. </summary>
/// <see cref="IStorageProvider#ReadStateAsync"/>
public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
var primaryKey = grainReference.ToKeyString();
if (Log.IsVerbose3)
{
Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_ReadingData, "Reading: GrainType={0} Pk={1} Grainid={2} from Database={3}", grainType, primaryKey, grainReference, redisDatabase.Database);
}
RedisValue value = await redisDatabase.StringGetAsync(primaryKey);
var data = new Dictionary<string, object>();
if (value.HasValue)
{
data = JsonConvert.DeserializeObject<Dictionary<string, object>>(value);
}
grainState.SetAll(data);
grainState.Etag = Guid.NewGuid().ToString();
}
/// <summary> Write state data function for this storage provider. </summary>
/// <see cref="IStorageProvider#WriteStateAsync"/>
public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
var primaryKey = grainReference.ToKeyString();
if (Log.IsVerbose3)
{
Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_WritingData, "Writing: GrainType={0} PrimaryKey={1} Grainid={2} ETag={3} to Database={4}", grainType, primaryKey, grainReference, grainState.Etag, redisDatabase.Database);
}
var data = grainState.AsDictionary();
var json = JsonConvert.SerializeObject(data);
await redisDatabase.StringSetAsync(primaryKey, json);
}
/// <summary> Clear state data function for this storage provider. </summary>
/// <remarks>
/// </remarks>
/// <see cref="IStorageProvider#ClearStateAsync"/>
public Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
var primaryKey = grainReference.ToKeyString();
if (Log.IsVerbose3)
{
Log.Verbose3((int)ProviderErrorCode.RedisStorageProvider_ClearingData, "Clearing: GrainType={0} Pk={1} Grainid={2} ETag={3} DeleteStateOnClear={4} from Table={5}", grainType, primaryKey, grainReference, grainState.Etag, redisDatabase.Database);
}
//remove from cache
redisDatabase.KeyDelete(primaryKey);
return TaskDone.Done;
}
}
The code is pretty straightforward and you can decorate your grain class with the right attribute.
[StorageProvider(ProviderName = "RedisStorage")]
public class HelloGrain : Orleans.Grain<IHelloState>, HelloWorldInterfaces.IHello
The State of HelloGrain will now be persisted inside Redis instead of Azure TableStorage.
See how my cache gets hit all the time!
I will also upload this to the OrleansContrib page on github at https://github.com/OrleansContrib/Orleans.StorageProvider.Redis. I will keep you posted on the progress on my blog....
No comments:
Post a Comment