Data Sharing

Overview

The Pub/Sub API is accessible through io.Bus.

Publish

To All Apps

To publish a message on a specific topic to all subscribed apps, use the Publish() method. It accepts the message topic and the data to publish as arguments:

var topic = "stocks";
var data = new { RIC = "AAPL.O" };

await io.Bus.Publish(topic, data);

To Specific Apps

Use the message options builder as a third argument of Publish() to make an app publish a message only to specific apps that have subscribed to a topic.

The example below demonstrates how to publish a message to another app (or to multiple instances of it) with a specific name:

var topic = "stocks";
var data = new { RIC = "AAPL.O" };
var appName = "app-name";

await io.Bus.Publish(topic, data, (messageOptions) =>
{
    messageOptions.WithTargetOptions(target => target.WithApplicationName(appName));
}

The Pub/Sub API compares the appName argument with the identity of each app subscribed to the topic and delivers the message only to subscribers with a matching app name.

The example below demonstrates how to publish messages with a specific routing key:

var topic = "stocks";
var data = new { RIC = "AAPL.O" };
var routingKey = "portfolio";

await io.Bus.Publish(topic, data, options => options.WithRoutingKey(routingKey));

The Pub/Sub API delivers messages with a routing key to all subscribers with the same routing key and to the ones with no routing key.

Subscribe

Messages from Any App

To subscribe for messages from all apps on a specific topic, use the Subscribe() method. Upon successful subscription, it returns a subscription object of type IDisposable. Use its Dispose() method to stop receiving messages on that topic.

Provide the topic on which you want to receive messages and a handler function for the messages:

var topic = "stocks";

IDisposable stocksSubscription = await io.Bus.Subscribe(topic, (message) =>
{
    Console.WriteLine($"Received data: {message.Data}");
});

// Closing the subscription.
stocksSubscription.Dispose();

Messages from Specific Apps

The example below demonstrates how to subscribe for messages with a specific routing key:

var topic = "stocks";
var routingKey = "portfolio";

await io.Bus.Subscribe(topic, (message) =>
{
    Console.WriteLine($"Received data: {message.Data}");
},
options => options.WithRoutingKey(routingKey));

The Pub/Sub API invokes the handler only for messages with a matching routing key and for the ones with no routing key.

Typed Message Buses

The .NET Pub/Sub API allows you to create typed message buses if you want to ensure that the publisher or the subscriber pushes or receives specific type of data.

To create a typed message bus, use the GetTypedBus() method:

// Push or receive messages containing only integer data.
var intBus = io.Bus.GetTypedbus<int>();

// Push or receive messages containing only data of type `AppState`.
var appStateBus = io.Bus.GetTypesBus<AppState>();

Publish

To publish messages on a typed bus:

var appStateBus = io.Bus.GetTypesBus<AppState>();
var topic = "app-state";
var data =  new AppState { DarkThemeOn = true };

await appStateBus.Publish(topic, data);

Subscribe

To subscribe for messages on a typed bus:

var intBus = io.Bus.GetTypedbus<int>();
var topic = "quantity";

await intBus.Subscribe(topic, (message) =>
{
    Console.WriteLine($"Received data: {message.Data}");
});

Or:

var topic = "quantity";

await io.Bus.Subscribe<int>(topic, (message) =>
{
    Console.WriteLine($"Received data: {message.Data}");
});

Untyped Data

The data which which fails to deserialize to the required type is accessible via the UntypedData property of the received message. Use the HasDeserializationError Boolean flag to determine whether a deserialization error has occurred. Use the WithIgnoreDeserializationErrors() method of the message options builder to specify whether the message handler should be invoked with the untyped data:

await intBus.Subscribe(topic, (message) =>
{
    if (message.HasDeserializationError)
    {
        Console.WriteLine($"Received wrong data type: {message.UntypedData}");
    }
    else
    {
        Console.WriteLine($"Received correct data type: {message.Data}")
    }
},
options => options.WithIgnoreDeserializationErrors(true));