Data Sharing

Method Registration

Registering Methods

To offer a method to other apps, call io.interop().register(), passing the method definition and a callback to handle client invocations.

io.interop().<Map<String, Object>, Map<String, Object>>register(
        MethodDefinition.builder("Sum").withSignature("int a, int b", "int answer").build(),
        (arg, caller) -> {
            int a = (Integer) arg.get("a");
            int b = (Integer) arg.get("b");
            return Collections.singletonMap("answer", a + b);

Once a method is registered, it can be invoked from any interop-enabled app (web, native or Java).

Method Definition

The method definition describes the Interop method your app is offering. It has the following properties:

Name Description
name Required. The name of the method, e.g. OpenClientPerformance()
displayName The actual name of the method that should be used in UI apps, e.g. "Open Client Performance"
description Description of what the method does, useful for documentation purposes and for UI clients, e.g. "Launches or activates the Client Performance app"
objectTypes The entities this method is meant to operate on, e.g. party, instrument, order, etc.

It is a good idea to specify displayName and description when defining a method. They can be used by a generic UI or by your own apps.

Method Signature

The signature of a method is a comma-delimited string of arguments, defined as follows:

type [array_modifier] [optional_modifier] argument_name [composite_schema] [description]

Where type is one of: Bool, Int, Long, Double, String, DateTime, Composite ("Composite" is explained below) and is case-insensitive, so bool and BOOL are the same thing.


Signature Explanation
String pId, String? dynamicsId pId is required, dynamicsId is optional
String branchCode, String[] gIds branchCode and gIds are required, gIds is an array of strings
Composite: { String first, String last } name name is a composite argument, and its schema is defined by 2 required strings - first and last

Composite is a structure which contains one or more fields of type:

  • scalar (bool, int, etc.)
  • array of scalars
  • a composite (nested structure)
  • an array of composites

Using Composite you can define almost any non-recursive, non-self-referential structure.

Asynchronous Results

You can obtain asynchronous results by using the registerAsync() method to register methods, which return a Future object:

io.interop().registerAsync("getQuote", (arg, caller) -> {
    CompletableFuture<Map<String, Object>> future = doSomethingAsync(arg);
    return future;

The doSomethingAsync() method above returns a Future object.

Method Invocation

Invoking Methods

To invoke a method offered by other apps, call io.interop().invoke(), passing the method name and arguments. Then use the returned Future to receive result or an error.

Map<String, Object> arg = new HashMap<>();
arg.put("a", 37);
arg.put("b", 5);
io.interop().invoke("Sum", arg)
        .thenAccept(result ->
                                    .ifPresent(r -> System.out.println(r.get("answer"))))

Multiple Responses

Invoking a method on multiple Interop instances produces multiple responses.

This is how you can iterate over all responses:

result.forEach(r -> {
    if (r.getStatus().isSuccess()) {
        System.out.printf("success:%s%n", r.getReturned().orElseGet(Collections::emptyMap));
    } else {
        System.out.printf("error:%s%n", r.getStatus().message().orElse(null));

When the invocation result has multiple responses, calling hasMultipleResponses() on the result will return true.


Discovering Methods

To list all available methods from all servers:


Searching for Methods

Live Search Query

io.Connect Java offers a fluent API for finding Interop methods. It enables you to make a live query when you search for methods to invoke. This means that once you have an initial result from the search query, methods will be automatically added to/removed from that result when they become available or, respectively, unavailable.

The query starts from the Search class. You can use different methods of the Search class to specify criteria for the query:

// here we are creating a Scheduler that we need for the example
// but you can also use an already created Scheduler
Scheduler scheduler = Scheduler.of(new Timer("search-scheduler", true));

        .method(ReifiedType.OBJECT_MAP)     // return type of the method
        .name("Sum")                        // name of the method
        .localOnly(true)                    // offered only by local or by both remote and local servers
        .server(io.interop().instance())  // specify a server offering the method (optional)
        .in(io.interop(), scheduler, Duration.ofSeconds(10))

The in() method specifies the Interop instance (io.interop()) which the query will use to track method added or removed events. The scheduler argument is of type Scheduler and is used to make the query asynchronous and non-blocking. The last argument is a timeout for the search query.

Instead of name(), you can also use the nameMatches() method to pass a regex string or a Pattern for the name of the method(s) you are interested in.

Local & Remote Methods

You can look for methods offered both by local and remote servers by using the localOnly() method. Pass a Boolean value to it to specify whether you need methods offered only by local servers or by both local and remote servers. The method localOnly() defaults to true, so if you omit it in the search, only methods offered by local servers will be returned.

Here is an example search for local methods:

        .localOnly(true) //this can be omitted as it defaults to true
        .in(io.interop(), scheduler, Duration.ofSeconds(10))
        .invoke(Collections.emptyMap(), // T args
        .whenComplete((result, error) -> {
            //logic for when the invocation completes

Discovering Servers

To list all servers offering methods:




Your app can publish events that can be observed by other apps and can provide real-time data (e.g., market data, news alerts, notifications, etc.) to other apps by publishing an Interop stream. It can also receive and react to these events and data by creating an Interop stream subscription.

Apps that create and publish to Interop streams are called "publishers", and apps that subscribe to Interop Streams are called "subscribers". An app can be both.

Subscribing to Streams

Subscribing to a stream is achieved by invoking io.interop().stream():

                Collections.singletonMap("symbol", "ORCL"))
        .thenAccept(stream -> stream.subscribe(new StreamSubscriber<Map<String, Object>>() {
            public void onData(ServerMethod method, Map<String, Object> data) {
                // do something with the data

Closing Stream Subscriptions

To close a stream subscription, invoke the close() or closeAsync() method on the subscription reference returned by the subscribe() method:

AsynchronousCloseable subscription =
        stream.subscribe(new StreamSubscriber<Map<String, Object>>() {});

Subscription Closed Notification

At any time, a stream subscription can be closed either because the publisher has shut down or due to an error.

stream.subscribe(new StreamSubscriber<Map<String, Object>>()
    public void onSubscribe(StreamSubscription subscription)
        subscription.onClose().thenRun(() -> {
            // called when the subscription is closed

    public void onFailed(ServerMethod method, String reason)
        // called if the subscription request is rejected

Publishing Data

To start publishing data, you need to register an Interop stream by calling io.interop().register() and providing a method definition and stream subscription request handler.

        .thenAccept(stream -> {
            Map<String, Object> data =
            // will send data to all branches, as no branch is specified

StreamSubscriptionRequestHandler.accept() will accept all subscription requests on the default branch (see Multicasting Data Using Branches below).

Handling Subscription Requests

To control how your app accepts or rejects stream subscription requests, specify a custom handler. The handler receives a StreamSubscriptionRequest as a second argument and must return a StreamConsumer instance by invoking the accept(), acceptOn() or reject() methods of the request.

        .<Map<String, Object>>register(
                request -> {
                    String app = request.getCaller().getApplication();
                    String symbol = (String) request.getArg().get("Symbol");

                    if (symbol != null) {
                        System.out.printf("Accepting %s subscription on %s%n", app, symbol);
                        return request.acceptOn(symbol);
                    } else {
                        System.out.printf("Rejecting %s symbol not specified%n", app);
                        return request.reject("Symbol not specified");

Handling New or Removed Consumers

To track when a consumer is added and/or removed, StreamSubscriptionRequestHandler provides the onAdded() and onRemoved() methods in which you can compose callbacks.

                        .<Map<String, Object>>accept((arg, caller) -> (String) arg.get("Symbol"))
                        .onAdded(consumer -> {
                            // called when new consumer subscription is accepted
                        .onRemoved(consumer -> {
                            // called when a consumer subscription is removed

In order to use these methods, you need an instance of type StreamSubscriptionRequestHandler. One way to achieve this, is to use one of the provided static factory methods instead of a lambda.

The following fragment creates a handler that is equivalent (if you ignore logging) with the one provided in the previous section:

        .<Map<String, Object>>accept((arg, caller) -> (String) arg.get("Symbol"))

The StreamConsumer reference can be used to:

  • access the request arguments and the caller instance - getArg() and getCaller();
  • inspect the branch on which the consumer was accepted - getBranch();
  • push data directly to a consumer (unicast) - send();
  • close the subscription forcefully - close() or closeAsync();

Multicasting Data Using Branches

A single stream supports one or multiple named sub-streams that are called branches. In cases where it isn't necessary for a stream to be split into multiple sub-streams, a default branch is used.

        .thenAccept(stream -> {
            Map<String, Object> data =
            // data will be sent to branches with names "Consumers" and "Providers"
            stream.send(data, "Consumers", "Providers");

The stream reference is of type StreamProducer and can be used to:

  • send data to specific stream branch(es). If no branch is specified, that data is sent to all branches:
stream.send(data, "Consumers", "Providers");
  • list all available branches:
Map<String, StreamBranch> allBranches = stream.getBranches();
  • list all stream consumers, regardless of the branch they are on:
List<StreamConsumer<?>> allConsumers = stream.getConsumers();

If your stream publishing code uses branches (e.g., creates a branch for each unique set of subscription arguments and associates consumers with that branch), whenever a data arrives from your underlying source, you can use that branch to publish data instead of manually iterating over all consumers to send data to the interested clients.

String branchKey = "Providers";
StreamBranch branch = stream.getBranches().get(branchKey);

if (branch != null) {
    System.out.printf("Pushed data to all subscribers on branch \"%s\"!%n", branch.getKey());
} else {
    System.out.printf("Branch \"%s\" doesn't exist!%n", branchKey);

To close all subscriptions on this branch, call closeAsync() on the StreamBranch instance:


Stream Discovery

Streams are special Interop methods on which isSupportsStreaming() returns true. You can use the Interop Discovery to find available streams.