Rx-based Publisher

Rx-based Publisher allows you to publish events to a topic of a WAMP router realm, using Reactive Extensions Observer api.

Basic usage

private static async Task Main()
{
    const string serverAddress = "ws://127.0.0.1:8080/ws";

    DefaultWampChannelFactory channelFactory = new DefaultWampChannelFactory();

    IWampChannel channel = channelFactory.CreateJsonChannel(serverAddress, "realm1");

    await channel.Open().ConfigureAwait(false);

    IWampRealmProxy realm = channel.RealmProxy;

    ISubject<int> subject =
        realm.Services.GetSubject<int>("com.myapp.topic1");

    // Publishes 5 to com.myapp.topic1
    subject.OnNext(5);
}

GetSubject overloads

If a generic type is specified in the GetSubject method, the ARGUMENTS parameter of the EVENT message will contain the single parameter OnNext receives.

To send further arguments, one can use the IWampEventValueTupleConverter or IWampSubject overloads. These are described in the following sections.

Tuples support

The interface IWampEventValueTupleConverter is responsible for converting a IWampSerializedEvent instance to a specified tuple and a specified tuple instance to a IWampEvent.

Luckily enough, in order to implement this interface, it suffices to derive from WampEventValueTupleConverter<> and specify the desired tuple type. Nothing else needs to be done.

(This might seem a bit odd, but that’s the best way I’m aware of for preserving ValueTuple element names after compilation)

Then, just pass an instance of your IWampEventValueTupleConverter to the overload of IWampRealmServiceProvider’s GetSubject method, which receives the topic’s uri and an instance of IWampEventValueTupleConverter, in order to receive a ISubject<> instance of your desired tuple type.

public static async Task Main()
{
	DefaultWampChannelFactory factory = new DefaultWampChannelFactory();

	IWampChannel channel =
		factory.CreateJsonChannel("ws://localhost:8080/ws", "realm1");

	await channel.Open();

	ISubject<(int, int)> topic1Subject =
		channel.RealmProxy.Services.GetSubject
				("com.myapp.topic1",
				new MyPositionalTupleEventConverter());

	ISubject<(int number1, int number2, string c, ComplexContract d)> topic2Subject 
		channel.RealmProxy.Services.GetSubject
				("com.myapp.topic2",
				new MyKeywordTupleEventConverter());

	IObservable<int> timer =
		Observable.Timer(TimeSpan.FromMilliseconds(0),
						 TimeSpan.FromMilliseconds(1000))
						 .Select((x, i) => i);

	Random random = new Random();

	IDisposable disposable =
		timer.Subscribe(value =>
		{
			topic1Subject.OnNext((random.Next(0, 100), 23));
			topic2Subject.OnNext((random.Next(0, 100), 23, "Hello",
								  new ComplexContract()
								  {
									  Counter = value,
									  Foo = new int[] { 1, 2, 3 }
								  }));
		});

    // This line is required in order to release the WebSocket thread, otherwise it will be blocked by the following Console.ReadLine() line.
    await Task.Yield();

    Console.ReadLine();
}

public class MyPositionalTupleEventConverter : WampEventValueTupleConverter<(int, int)>
{
}

public class MyKeywordTupleEventConverter : WampEventValueTupleConverter<(int number1, int number2, string c, ComplexContract d)>
{
}

public class ComplexContract
{
    [JsonProperty("counter")]
    public int Counter { get; set; }

    [JsonProperty("foo")]
    public int[] Foo { get; set; }

    public override string ToString()
    {
        return string.Format("counter: {0}, foo: [{1}]",
            Counter,
            string.Join(", ", Foo));
    }
}

The following Javascript code can consume events published by the code above:

function on_topic1(args, kwargs) {
    console.log("com.myapp.topic1: Got event:", args, kwargs);
}

function on_topic2(args, kwargs) {
    console.log("com.myapp.topic2: Got event:", args, kwargs);
}

session.subscribe('com.myapp.topic1', on_topic1);
session.subscribe('com.myapp.topic2', on_topic2);

This example is based on this AutobahnJS sample.

IWampSubject

Specifying no generic type will a return a IWampSubject, that is a IObserver of a IWampEvent. IWampEvent is an interface representing an outgoing WAMP PUBLISH message. It has properties that represent PUBLISH message arguments.

private static async Task Main()
{
    const string serverAddress = "ws://127.0.0.1:8080/ws";

    DefaultWampChannelFactory channelFactory = new DefaultWampChannelFactory();

    IWampChannel channel = channelFactory.CreateJsonChannel(serverAddress, "realm1");

    await channel.Open().ConfigureAwait(false);

    IWampRealmProxy realm = channel.RealmProxy;

    IWampSubject subject =
        realm.Services.GetSubject("com.myapp.topic2");

    int counter = 0;

    IObservable<long> timer =
        Observable.Timer(TimeSpan.FromMilliseconds(0),
                         TimeSpan.FromMilliseconds(1000));

    Random random = new Random();

    IDisposable disposable =
        timer.Subscribe(x =>
        {
            var obj =
                new
                {
                    counter = counter,
                    foo = new int[] {1, 2, 3}
                };

            WampEvent @event = new WampEvent()
            {
                Options = new PublishOptions {DiscloseMe = true},
                Arguments = new object[] {random.Next(0, 100), 23},
                ArgumentsKeywords = new Dictionary<string, object>
                {
                    {"c", "Hello"},
                    {"d", obj}
                }
            };

            subject.OnNext(@event);
            Console.WriteLine("events published");

            counter += 1;
        });

    // This line is required in order to release the WebSocket thread, otherwise it will be blocked by the following Console.ReadLine() line.
    await Task.Yield();

    Console.ReadLine();
}

This example is based on this AutobahnJS sample.