Wednesday, March 2, 2011

Implement Publisher Subscriber model in WCF



Pre Requisites
                Readers must be having knowledge in creating basic WCF service and must know how to consume it.

Here I tried to illustrate how we can achieve publisher subscriber model in WCF
To explain this Let me take one common real world scenario Stock Exchange, Where investors register to Service and service will notify to all investors when stock got changed
My intention here is to make you understand how we can achieve publisher subscriber model in WCF not to solve above mentioned problem.
Duplex Operations
To achieve this we will use duplex operations.
Duplex operations allows client to send a request to service which contains information about endpoint at client which will be used to get communication from service
Service must be hosted with a binding that supports duplex communication. Following are the bindings that support duplex communication
·         Wsdualhttpbinding
·         Tcpbinding
·         Namedpipes

Let us start implementing duplex operations so that you will get more clarity on what I’m saying

To achieve our stock exchange scenario we need Service, Investor and Stock Exchange

Service
RegisterInvestorService: This service allows investors to register or de register to get updated when the stock price got changed
StockExchangeService: This service will be used by stock exchange to update price. When price got updated it will inform all the investors registered to the service

Investor
This consumes RegisterInvestorService and Registeritself to service and will get notifications from  RegisterInvestorService

Stock Exchange
                This consumes StockExchangeService and updates the price.


Service Implementation

Create Service Contracts as mentioned below
   [ServiceContract(CallbackContract = typeof(IRegisterInvestorServiceCallback))]
    public interface IRegisterInvestorService
    {
        [OperationContract(IsOneWay = true)]
        void RegisterInvestor(Guid investorid);

        [OperationContract(IsOneWay = true)]
        void DeRegisterInvestor(Guid investorid);
    }

    public interface IRegisterInvestorServiceCallback
    {
        [OperationContract(IsOneWay = true)]
        void StockUpdated(double price);
    }
    [ServiceContract()]
    public interface IStockExcahangeService
    {
        [OperationContract(IsOneWay = true)]
        void UpdateStock(double price);
    }

Let me Explain Purpose of each interface
IRegisterInvestorService: This will be useful for the investors who want to register or de register to get updated when the stock price got changed
INotifyInvestors: This will be the contract which will used to notify the clients when stock got updated. Client will implement this and take action when stock price got changed
IStockExcahangeService: This will be useful for stock exchange to inform service when price got changed. Then this service will notify all its clients
When you observe above code you will find 2 peculiar things
1.       CallbackContract:  says who is the callback contract for this service contract. So that this service contract will notify all clients who registered with service using this call back contract
2.       interface INotifyInvestors contains some Operation Contract but not having ServiceContract: This is called callback contract which will be implemented by the client (investors) and called by the service
Create below class in Service where it will store registered Investors information. This contains information about how we register and de register a investor and update all client who registerd with it when price got updated
    public static class MicrosoftStock
    {
        private static Dictionary<Guid, INotifyInvestors> registeredInvestors =
        new Dictionary<Guid, INotifyInvestors>();

        public static void UpdateStock(double price)
        {
            foreach (KeyValuePair<Guid, INotifyInvestors> obj in registeredInvestors)
            {
                obj.Value.StockUpdated(price);
            }
        }

        public static void RegisterInvestor(Guid investorid,
        INotifyInvestors notifyInvestor)
        {
            registeredInvestors.Add(investorid, notifyInvestor);
        }

        public static void DegisterInvestor(Guid investorid)
        {
            registeredInvestors.Remove(investorid);
        }
    }

In above code you might have observed one thing that i kept MicrosoftStock as static so that all investors can share the information.

Implement IRegisterInvestorService
    public class RegisterInvestorService : IRegisterInvestorService
    {
        public void RegisterInvestor(Guid investorid)
        {
            INotifyInvestors callback =    OperationContext.Current.GetCallbackChannel<INotifyInvestors>();
            MicrosoftStock.RegisterInvestor(investorid, callback);
        }

        public void DeRegisterInvestor(Guid id)
        {
            MicrosoftStock.DegisterInvestor(id);
        }
    }


Implement IStockExcahangeService
    public class StockExchangeService : IStockExcahangeService
    {
        public void UpdateStock(double price)
        {
            MicrosoftStock.UpdateStock(price);
        }
    }


Below is the config for the service which is common for any other WCF service except we are using wsdualHttpBinding which supports duplex communication as I mentioned earlier.
<system.serviceModel>
    <services>
      <service behaviorConfiguration="serviceBehavior" name="Stock.RegisterInvestorService">
        <endpoint address="wsDual" binding="wsDualHttpBinding" contract="Stock.IRegisterInvestorService" />
      </service>
      <service behaviorConfiguration="serviceBehavior" name="Stock.StockExchangeService">
        <endpoint address="basicHTTP" binding="basicHttpBinding" contract="Stock.IStockExcahangeService" />
      </service>
    </services>
    <behaviors>
      <serviceBehaviors>
        <behavior name="serviceBehavior">
          <serviceMetadata httpGetEnabled="true" />
          <serviceDebug includeExceptionDetailInFaults="false" />
        </behavior>
        <behavior name="">
          <serviceMetadata httpGetEnabled="true" />
          <serviceDebug includeExceptionDetailInFaults="false" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <serviceHostingEnvironment multipleSiteBindingsEnabled="true" />
  </system.serviceModel>


Now the service is ready.. I leave it to you how you host the service
Investor Implementation
This is simple console application which consumes RegisterInvestorService(hope you are aware of how to consume WCF service)

First Step in Investor is implementing  IRegisterInvestorServiceCallback, where we will get notifications. In implementation investor can take some action when price got changed

[CallbackBehavior(UseSynchronizationContext = false)]
class StockExcahageUpdates : IRegisterInvestorServiceCallback
{
    public void StockUpdated(double price)
    {
      Console.WriteLine("Stock Updted: " + price.ToString());
    }
}


Second step is register investor to the service and wait for notifications.

In this step you will observe one thing ServiceClient is accepting InstanceContext as parameter which is not required in traditional Request/Reply Service.
Using Instance context we will let service know to whom it needs to be notified when price got changed or we can say it as client’s end point to which service needs to be communicated

[CallbackBehavior(UseSynchronizationContext = false)]
class Program
{
    static void Main(string[] args)
    {
      InstanceContext callbackInstance = new InstanceContext(new StockExcahageUpdates());

      InvestorService.RegisterInvestorServiceClient proxy = new InvestorService.RegisterInvestorServiceClient(callbackInstance);

      proxy.RegisterInvestor(Guid.NewGuid());

      Console.WriteLine("Observing the Stock...");
      Console.ReadKey();
    }
}


Stock Exchange Implementation
This is simple Console Application which consumes StockExchangeService and update price to the service.
        static void Main(string[] args)
        {
            StockExchangeService.StockExcahangeServiceClient proxy = new StockExchangeService.StockExcahangeServiceClient();

            proxy.UpdateStock(1000);

                  }


Testing
1.       Service must be in running state
2.       Create couple investors by executing Invetors.exe couple of time. Then both investors will be registered to service and waiting for price change notification from service
3.       Run Stock exchange which will update the price in service, then you can observe  above 2 investors will get notified with updated price
Hope this helps
Vital.

11 comments:

  1. Hi, nice logics...can u provide the code at jollyabhi@gmail.com

    ReplyDelete
  2. I have given a link for complete source code at top of this article. Do you have any problems in downloading it?

    ReplyDelete
  3. Article Rocks!!!!! these kind of realtime examples are seldom found.
    will try to implement myself ... any doubts will get back to you.
    ---Quick question----
    Any reason in choosing "Wsdualhttpbinding" when you mentioned these three bindings -- Wsdualhttpbinding,Tcpbinding,Namedpipes ?

    ReplyDelete
  4. Thanks Ratan. No specific reason to select wsdualhttpbindign. Simplly selected one of the binding to demonstrate

    ReplyDelete
  5. Your interface names are off after the first code example...In the code you define three interfaces:
    1.IRegisterInvestorService
    2.IRegisterInvestorServiceCallback
    3.IStockExcahangeService

    But then mention a fourth interface INotifyInvestors which is explained what it means without having a definition, which leaves IRegisterInvestorServiceCallback defined with no explanation. Very confusing, but otherwise good article.

    ReplyDelete
  6. HI..
    verry good article but for a production environment make sure to add a lock each time you use your dictionary (read, add, remove).
    Also the MicrosoftStock class should be marked private with the methods in it as internal..
    This implics a more secure implementations.
    Regards

    ReplyDelete
  7. Hi. Great post! I downloaded your solution, run it and something went wrong. There's an error: All I can in the WCF Service Host was 3 Red Exclamation points! I know I could be missing something but I don't know what. I'm new in WCF as well as in Publish-Subscribe framework. I'd like your example be my guide in this. Please help. Thank you. Please response.

    ReplyDelete
  8. Good job, thank you.

    ReplyDelete
  9. Download link isn't working. Any chance of getting a copy of this project?

    ReplyDelete
  10. When I try tu run Investor1.exe i get the following error. Please advice.

    System.ServiceModel.AddressAlreadyInUseException was unhandled
    Message=HTTP could not register URL http://+:80/Temporary_Listen_Addresses/6f0a7d10-6d01-44fe-a0bc-24ec0782f942/ because TCP port 80 is being used by another application.
    Source=System.ServiceModel
    StackTrace:
    at System.ServiceModel.Channels.SharedHttpTransportManager.OnOpen()
    at System.ServiceModel.Channels.TransportManager.Open(TransportChannelListener channelListener)
    at System.ServiceModel.Channels.TransportManagerContainer.Open(SelectTransportManagersCallback selectTransportManagerCallback)
    at System.ServiceModel.Channels.TransportChannelListener.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.HttpChannelListener.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.DatagramChannelDemuxer`2.OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)
    at System.ServiceModel.Channels.SingletonChannelListener`3.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.LayeredChannelListener`1.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.DatagramChannelDemuxer`2.OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)
    at System.ServiceModel.Channels.SingletonChannelListener`3.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.InternalDuplexChannelFactory.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.SecurityChannelFactory`1.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.ReliableChannelFactory`2.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.Channels.ServiceChannelFactory.TypedServiceChannelFactory`1.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.ChannelFactory.OnOpen(TimeSpan timeout)
    at System.ServiceModel.Channels.CommunicationObject.Open(TimeSpan timeout)
    at System.ServiceModel.ChannelFactory.EnsureOpened()
    at System.ServiceModel.DuplexChannelFactory`1.CreateChannel(InstanceContext callbackInstance, EndpointAddress address, Uri via)
    at System.ServiceModel.DuplexChannelFactory`1.CreateChannel(EndpointAddress address, Uri via)
    at System.ServiceModel.ChannelFactory`1.CreateChannel()
    at System.ServiceModel.DuplexClientBase`1.CreateChannel()
    at System.ServiceModel.ClientBase`1.CreateChannelInternal()
    at System.ServiceModel.ClientBase`1.get_Channel()
    at Investor1.InvestorService.RegisterInvestorServiceClient.RegisterInvestor(Guid investorid) in E:\Dotnet Projects\ObserverPatternWCF\Investor1\Service References\InvestorService\Reference.cs:line 61
    at Investor1.Program.Main(String[] args) in E:\Dotnet Projects\ObserverPatternWCF\Investor1\Program.cs:line 17
    InnerException: System.Net.HttpListenerException
    Message=The process cannot access the file because it is being used by another process
    Source=System
    ErrorCode=32
    NativeErrorCode=32
    StackTrace:
    at System.Net.HttpListener.AddAllPrefixes()
    at System.Net.HttpListener.Start()
    at System.ServiceModel.Channels.SharedHttpTransportManager.OnOpen()
    InnerException:

    ReplyDelete