Data Sharing
Overview
The Pub/Sub API is accessible through io.Bus
.
Publish
To All Apps
To publish a message on a specific topic to all subscribed apps, use the Publish()
method. It accepts the message topic and the data to publish as arguments:
var topic = "stocks";
var data = new { RIC = "AAPL.O" };
await io.Bus.Publish(topic, data);
To Specific Apps
Use the message options builder as a third argument of Publish()
to make an app publish a message only to specific apps that have subscribed to a topic.
The example below demonstrates how to publish a message to another app (or to multiple instances of it) with a specific name:
var topic = "stocks";
var data = new { RIC = "AAPL.O" };
var appName = "app-name";
await io.Bus.Publish(topic, data, (messageOptions) =>
{
messageOptions.WithTargetOptions(target => target.WithApplicationName(appName));
}
The Pub/Sub API compares the appName
argument with the identity of each app subscribed to the topic and delivers the message only to subscribers with a matching app name.
The example below demonstrates how to publish messages with a specific routing key:
var topic = "stocks";
var data = new { RIC = "AAPL.O" };
var routingKey = "portfolio";
await io.Bus.Publish(topic, data, options => options.WithRoutingKey(routingKey));
The Pub/Sub API delivers messages with a routing key to all subscribers with the same routing key and to the ones with no routing key.
Subscribe
Messages from Any App
To subscribe for messages from all apps on a specific topic, use the Subscribe()
method. Upon successful subscription, it returns a subscription object of type IDisposable
. Use its Dispose()
method to stop receiving messages on that topic.
Provide the topic on which you want to receive messages and a handler function for the messages:
var topic = "stocks";
IDisposable stocksSubscription = await io.Bus.Subscribe(topic, (message) =>
{
Console.WriteLine($"Received data: {message.Data}");
});
// Closing the subscription.
stocksSubscription.Dispose();
Messages from Specific Apps
The example below demonstrates how to subscribe for messages with a specific routing key:
var topic = "stocks";
var routingKey = "portfolio";
await io.Bus.Subscribe(topic, (message) =>
{
Console.WriteLine($"Received data: {message.Data}");
},
options => options.WithRoutingKey(routingKey));
The Pub/Sub API invokes the handler only for messages with a matching routing key and for the ones with no routing key.
Typed Message Buses
The .NET Pub/Sub API allows you to create typed message buses if you want to ensure that the publisher or the subscriber pushes or receives specific type of data.
To create a typed message bus, use the GetTypedBus()
method:
// Push or receive messages containing only integer data.
var intBus = io.Bus.GetTypedbus<int>();
// Push or receive messages containing only data of type `AppState`.
var appStateBus = io.Bus.GetTypesBus<AppState>();
Publish
To publish messages on a typed bus:
var appStateBus = io.Bus.GetTypesBus<AppState>();
var topic = "app-state";
var data = new AppState { DarkThemeOn = true };
await appStateBus.Publish(topic, data);
Subscribe
To subscribe for messages on a typed bus:
var intBus = io.Bus.GetTypedbus<int>();
var topic = "quantity";
await intBus.Subscribe(topic, (message) =>
{
Console.WriteLine($"Received data: {message.Data}");
});
Or:
var topic = "quantity";
await io.Bus.Subscribe<int>(topic, (message) =>
{
Console.WriteLine($"Received data: {message.Data}");
});
Untyped Data
The data which which fails to deserialize to the required type is accessible via the UntypedData
property of the received message. Use the HasDeserializationError
Boolean flag to determine whether a deserialization error has occurred. Use the WithIgnoreDeserializationErrors()
method of the message options builder to specify whether the message handler should be invoked with the untyped data:
await intBus.Subscribe(topic, (message) =>
{
if (message.HasDeserializationError)
{
Console.WriteLine($"Received wrong data type: {message.UntypedData}");
}
else
{
Console.WriteLine($"Received correct data type: {message.Data}")
}
},
options => options.WithIgnoreDeserializationErrors(true));