Wednesday, March 2, 2011

Implement Publisher Subscriber model in WCF



Pre Requisites
                Readers must be having knowledge in creating basic WCF service and must know how to consume it.

Here I tried to illustrate how we can achieve publisher subscriber model in WCF
To explain this Let me take one common real world scenario Stock Exchange, Where investors register to Service and service will notify to all investors when stock got changed
My intention here is to make you understand how we can achieve publisher subscriber model in WCF not to solve above mentioned problem.
Duplex Operations
To achieve this we will use duplex operations.
Duplex operations allows client to send a request to service which contains information about endpoint at client which will be used to get communication from service
Service must be hosted with a binding that supports duplex communication. Following are the bindings that support duplex communication
·         Wsdualhttpbinding
·         Tcpbinding
·         Namedpipes

Let us start implementing duplex operations so that you will get more clarity on what I’m saying

To achieve our stock exchange scenario we need Service, Investor and Stock Exchange

Service
RegisterInvestorService: This service allows investors to register or de register to get updated when the stock price got changed
StockExchangeService: This service will be used by stock exchange to update price. When price got updated it will inform all the investors registered to the service

Investor
This consumes RegisterInvestorService and Registeritself to service and will get notifications from  RegisterInvestorService

Stock Exchange
                This consumes StockExchangeService and updates the price.


Service Implementation

Create Service Contracts as mentioned below
   [ServiceContract(CallbackContract = typeof(IRegisterInvestorServiceCallback))]
    public interface IRegisterInvestorService
    {
        [OperationContract(IsOneWay = true)]
        void RegisterInvestor(Guid investorid);

        [OperationContract(IsOneWay = true)]
        void DeRegisterInvestor(Guid investorid);
    }

    public interface IRegisterInvestorServiceCallback
    {
        [OperationContract(IsOneWay = true)]
        void StockUpdated(double price);
    }
    [ServiceContract()]
    public interface IStockExcahangeService
    {
        [OperationContract(IsOneWay = true)]
        void UpdateStock(double price);
    }

Let me Explain Purpose of each interface
IRegisterInvestorService: This will be useful for the investors who want to register or de register to get updated when the stock price got changed
INotifyInvestors: This will be the contract which will used to notify the clients when stock got updated. Client will implement this and take action when stock price got changed
IStockExcahangeService: This will be useful for stock exchange to inform service when price got changed. Then this service will notify all its clients
When you observe above code you will find 2 peculiar things
1.       CallbackContract:  says who is the callback contract for this service contract. So that this service contract will notify all clients who registered with service using this call back contract
2.       interface INotifyInvestors contains some Operation Contract but not having ServiceContract: This is called callback contract which will be implemented by the client (investors) and called by the service
Create below class in Service where it will store registered Investors information. This contains information about how we register and de register a investor and update all client who registerd with it when price got updated
    public static class MicrosoftStock
    {
        private static Dictionary<Guid, INotifyInvestors> registeredInvestors =
        new Dictionary<Guid, INotifyInvestors>();

        public static void UpdateStock(double price)
        {
            foreach (KeyValuePair<Guid, INotifyInvestors> obj in registeredInvestors)
            {
                obj.Value.StockUpdated(price);
            }
        }

        public static void RegisterInvestor(Guid investorid,
        INotifyInvestors notifyInvestor)
        {
            registeredInvestors.Add(investorid, notifyInvestor);
        }

        public static void DegisterInvestor(Guid investorid)
        {
            registeredInvestors.Remove(investorid);
        }
    }

In above code you might have observed one thing that i kept MicrosoftStock as static so that all investors can share the information.

Implement IRegisterInvestorService
    public class RegisterInvestorService : IRegisterInvestorService
    {
        public void RegisterInvestor(Guid investorid)
        {
            INotifyInvestors callback =    OperationContext.Current.GetCallbackChannel<INotifyInvestors>();
            MicrosoftStock.RegisterInvestor(investorid, callback);
        }

        public void DeRegisterInvestor(Guid id)
        {
            MicrosoftStock.DegisterInvestor(id);
        }
    }


Implement IStockExcahangeService
    public class StockExchangeService : IStockExcahangeService
    {
        public void UpdateStock(double price)
        {
            MicrosoftStock.UpdateStock(price);
        }
    }


Below is the config for the service which is common for any other WCF service except we are using wsdualHttpBinding which supports duplex communication as I mentioned earlier.
<system.serviceModel>
    <services>
      <service behaviorConfiguration="serviceBehavior" name="Stock.RegisterInvestorService">
        <endpoint address="wsDual" binding="wsDualHttpBinding" contract="Stock.IRegisterInvestorService" />
      </service>
      <service behaviorConfiguration="serviceBehavior" name="Stock.StockExchangeService">
        <endpoint address="basicHTTP" binding="basicHttpBinding" contract="Stock.IStockExcahangeService" />
      </service>
    </services>
    <behaviors>
      <serviceBehaviors>
        <behavior name="serviceBehavior">
          <serviceMetadata httpGetEnabled="true" />
          <serviceDebug includeExceptionDetailInFaults="false" />
        </behavior>
        <behavior name="">
          <serviceMetadata httpGetEnabled="true" />
          <serviceDebug includeExceptionDetailInFaults="false" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <serviceHostingEnvironment multipleSiteBindingsEnabled="true" />
  </system.serviceModel>


Now the service is ready.. I leave it to you how you host the service
Investor Implementation
This is simple console application which consumes RegisterInvestorService(hope you are aware of how to consume WCF service)

First Step in Investor is implementing  IRegisterInvestorServiceCallback, where we will get notifications. In implementation investor can take some action when price got changed

[CallbackBehavior(UseSynchronizationContext = false)]
class StockExcahageUpdates : IRegisterInvestorServiceCallback
{
    public void StockUpdated(double price)
    {
      Console.WriteLine("Stock Updted: " + price.ToString());
    }
}


Second step is register investor to the service and wait for notifications.

In this step you will observe one thing ServiceClient is accepting InstanceContext as parameter which is not required in traditional Request/Reply Service.
Using Instance context we will let service know to whom it needs to be notified when price got changed or we can say it as client’s end point to which service needs to be communicated

[CallbackBehavior(UseSynchronizationContext = false)]
class Program
{
    static void Main(string[] args)
    {
      InstanceContext callbackInstance = new InstanceContext(new StockExcahageUpdates());

      InvestorService.RegisterInvestorServiceClient proxy = new InvestorService.RegisterInvestorServiceClient(callbackInstance);

      proxy.RegisterInvestor(Guid.NewGuid());

      Console.WriteLine("Observing the Stock...");
      Console.ReadKey();
    }
}


Stock Exchange Implementation
This is simple Console Application which consumes StockExchangeService and update price to the service.
        static void Main(string[] args)
        {
            StockExchangeService.StockExcahangeServiceClient proxy = new StockExchangeService.StockExcahangeServiceClient();

            proxy.UpdateStock(1000);

                  }


Testing
1.       Service must be in running state
2.       Create couple investors by executing Invetors.exe couple of time. Then both investors will be registered to service and waiting for price change notification from service
3.       Run Stock exchange which will update the price in service, then you can observe  above 2 investors will get notified with updated price
Hope this helps
Vital.