Monday, December 10, 2012

Chaining Topics and Subscriptions

With the release of Windows Azure SDK 1.8 a new useful way of chaining queues or topics together is available. The ForwardTo method allows you to "connect" queues with eachother and implement scale out scenarios.

Consider a single Service Bus topic receiving purchase orders. After a while you notice that processing the orders by consumers takes more time than desired. To enable scale out scenarios where more consumers can take messages at a time you can choose to distribute messages to different topics based on e.g. region.

The code snippet below demonstrates how to use the initial topic scenario where messages are pushed to a subscription based on region. This snippet sends only one message to the EMEA region and then some sort of random based on Ticks. This snippet can be used to populate the Topic and its subscriptions.


            string ServiceNamespace = "yournamespace";
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider("issuer",
                "yourkey");
            // Create namespace client
            NamespaceManager namespaceClient = new NamespaceManager(
                ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials);
            TopicDescription poTopic = null;

            if (!namespaceClient.TopicExists("PurchaseOrderTopic"))
            {
                poTopic = namespaceClient.CreateTopic("PurchaseOrderTopic");
            }
            else
            {
                poTopic = namespaceClient.GetTopic("PurchaseOrderTopic");
            }

            MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb",
                ServiceNamespace, string.Empty), credentials);

            TopicClient myTopicClient = factory.CreateTopicClient(poTopic.Path);


            SqlFilter EmeaFilter = new SqlFilter("Region = 'EMEA'");
            SqlFilter UsaFilter = new SqlFilter("Region = 'USA'");


            if (!namespaceClient.SubscriptionExists(poTopic.Path, "EMEA"))
            {
                SubscriptionDescription myAgentSubscription = namespaceClient.CreateSubscription(poTopic.Path, "EMEA",
                    EmeaFilter);
            }

            if (!namespaceClient.SubscriptionExists(poTopic.Path, "USA"))
            {
                SubscriptionDescription myAuditSubscription = namespaceClient.CreateSubscription(poTopic.Path, "USA",
                    UsaFilter);
            }

            PurchaseOrder order = new PurchaseOrder()
                {
                     Amount = 100,
                     Article = "Car",
                     Region = "EMEA"
                };

            BrokeredMessage POMessage = new BrokeredMessage(order);
            POMessage.Properties["Region"] = order.Region;

            Console.WriteLine("Sending initial message to EMEA");
            myTopicClient.Send(POMessage);


            while (Console.ReadLine() != "q")
            {
                POMessage = new BrokeredMessage(order);

                if (DateTime.Now.Ticks % 2 == 0)
                {
                    Console.WriteLine("Sending another message to EMEA");
                    POMessage.Properties["Region"] = "EMEA";
                }
                else
                {
                    Console.WriteLine("Sending another message to USA");
                    POMessage.Properties["Region"] = "USA";
                }
                myTopicClient.Send(POMessage);
            }



Pressing several times will add random messages.

The next blog post demonstrates how to redistribute messages based on their country by using Auto-forwarding feature of the Service Bus.