Rx-based Subscriber

Rx-based Subscriber allows you to subscribe to events of a topic of a WAMP router realm, using Reactive Extensions Observable api.

Basic usage

private static async Task RunAsync()
{
    DefaultWampChannelFactory factory = new DefaultWampChannelFactory();

    const string serverAddress = "ws://127.0.0.1:8080/ws";

    IWampChannel channel =
        factory.CreateJsonChannel(serverAddress, "realm1");

    await channel.Open().ConfigureAwait(false);

    IWampRealmProxy realmProxy = channel.RealmProxy;

    IDisposable subscription =
        realmProxy.Services.GetSubject<int>("com.myapp.topic1")
                  .Subscribe(x =>
                  {
                      Console.WriteLine("Got Event: " + x);
                  });

    // Call subscription.Dispose(); to unsubscribe.
}

GetSubject overloads

GetSubject allows you to subscribe to a topic of a WAMP realm, given the type of the received events.

If a generic type is specified, only the first argument in the Arguments parameter of the EVENT message is deserialized.

To handle other parameters of the EVENT message, one can use the IWampEventValueTupleConverter or IWampSubject overloads. These are described in the following sections.

Tuples support

The interface IWampEventValueTupleConverter is responsible for converting a IWampSerializedEvent instance to a specified tuple and a specified tuple instance to a IWampEvent.

Luckily enough, in order to implement this interface, it suffices to derive from WampEventValueTupleConverter<> and specify the desired tuple type. Nothing else needs to be done.

(This might seem a bit odd, but that’s the best way I’m aware of for preserving ValueTuple element names after compilation)

Then, just pass an instance of your IWampEventValueTupleConverter to the overload of IWampRealmServiceProvider’s GetSubject method, which receives the topic’s uri and an instance of IWampEventValueTupleConverter, in order to receive a ISubject<> instance of your desired tuple type.

public async Task Run()
{
    DefaultWampChannelFactory factory = new DefaultWampChannelFactory();

    IWampChannel channel =
        factory.CreateJsonChannel("ws://localhost:8080/ws", "realm1");

    await channel.Open();

    ISubject<(int, int)> topic1Subject =
        channel.RealmProxy.Services.GetSubject
                ("com.myapp.topic1",
                new MyPositionalTupleEventConverter());

    topic1Subject.Subscribe(value =>
    {
        (int number1, int number2) = value;
        Console.WriteLine($">com.myapp.topic1: Got event: number1:{number1}, number2:{number2}");
    });

    ISubject<(int number1, int number2, string c, ComplexContract d)> topic2Subject =
        channel.RealmProxy.Services.GetSubject
                ("com.myapp.topic2",
                new MyKeywordTupleEventConverter());

    topic2Subject.Subscribe(value =>
    {
        (int number1, int number2, string c, ComplexContract d) = value;
        Console.WriteLine($">com.myapp.topic2: Got event: number1:{number1}, number2:{number2}, c:{c}, d:{d}");
    });
}

public class MyPositionalTupleEventConverter : WampEventValueTupleConverter<(int, int)>
{
}

public class MyKeywordTupleEventConverter : WampEventValueTupleConverter<(int number1, int number2, string c, ComplexContract d)>
{
}

public class ComplexContract
{
    [JsonProperty("counter")]
    public int Counter { get; set; }

    [JsonProperty("foo")]
    public int[] Foo { get; set; }

    public override string ToString()
    {
        return string.Format("counter: {0}, foo: [{1}]",
            Counter,
            string.Join(", ", Foo));
    }
}

This code can consume events published by the following Javascript code:

var counter = 0;

setInterval(function () {
    var obj = {'counter': counter, 'foo': [1, 2, 3]};

    session.publish('com.myapp.topic1', [randint(0, 100), 23], {});
    session.publish('com.myapp.topic2', [], {number1: randint(0, 100), number2: 23, c: "Hello", d: obj});

    counter += 1;

    console.log("events published");
}, 1000);

This example is based on this AutobahnJS sample.

IWampSubject

Specifying no generic type to the GetSubject method will a return a IWampSubject, that is a IObservable of a IWampSerializedEvent. IWampSerializedEvent is an interface representing an incoming WAMP EVENT message. It has properties of type ISerializedValue that can be deserialized.

public static void Main(string[] args)
{
    IDisposable disposable = RunAsync().Result;
    Console.ReadLine();
    disposable.Dispose();
}

private static async Task<IDisposable> RunAsync()
{
    DefaultWampChannelFactory factory = new DefaultWampChannelFactory();

    const string serverAddress = "ws://127.0.0.1:8080/ws";

    IWampChannel channel =
        factory.CreateJsonChannel(serverAddress, "realm1");

    await channel.Open().ConfigureAwait(false);

    IWampRealmProxy realmProxy = channel.RealmProxy;

    IWampSubject topic =
        realmProxy.Services.GetSubject("com.myapp.topic2");

    IDisposable disposable =
        topic.Subscribe(OnTopic2);

    return disposable;
}

private static void OnTopic2(IWampSerializedEvent serializedEvent)
{
    int[] arguments =
        serializedEvent.Arguments.Select(argument => argument.Deserialize<int>())
         .ToArray();

    string c =
        serializedEvent.ArgumentsKeywords["c"].Deserialize<string>();

    ComplexContract d =
        serializedEvent.ArgumentsKeywords["d"].Deserialize<ComplexContract>();

    var deserializedArguments =
        new
        {
            arguments,
            argumentsKeywords = new
            {
                c,
                d
            }
        };

    Console.WriteLine("Got event: args: [{0}], kwargs: {{ {1} }}",
                      string.Join(", ", deserializedArguments.arguments),
                      deserializedArguments.argumentsKeywords);
}

public class ComplexContract
{
    [JsonProperty("counter")]
    public int Counter { get; set; }

    [JsonProperty("foo")]
    public int[] Foo { get; set; }

    public override string ToString()
    {
        return $"counter: {Counter}, foo: [{string.Join(", ", Foo)}]";
    }
}

This example is based on this AutobahnJS sample.