Skip to content

Commit

Permalink
[#11004] Removed legacy channel api supports
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed May 13, 2024
1 parent 9a66857 commit fef2e1a
Show file tree
Hide file tree
Showing 31 changed files with 182 additions and 612 deletions.
85 changes: 85 additions & 0 deletions channel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@

## Purpose

This module abstracts the channel concept which are usually implemented by Redis, or Kafka, etc.

Regardless of the underlying implementation,
the channel is a way to send messages from one part of the system to another.
Also, the channel can provide the way to broadcast certain events to the unknown multiple subscribers.

### Usage

The channel should be obtained from channel repository, which provides the channel by URI.

Only the single channel repository should be exist in JVM,
and created with some pairs of `ChannelProvider` and its name.

```java
ChannelRepository repository = new ChannelRepository(List.of(
ChannelProviderRegistry.of("redis", new RedisChannelProvider("redis")),
ChannelProviderRegistry.of("kafka", new KafkaChannelProvider("kafka"))
))
```

Then, the channel can be obtained by the URI.

**Hello World Example**

At instance-subscriber, the process should print the message "Hello, world!" which is published by instance-publisher.

```java
// Instance-subscriber

URI uri = URI.create("redis://system-out?param=foo");
SubChannel subChannel = repository.getSubChannel(uri);
subChannel.subscribe(message -> {
System.out.println(new String(message));
});
```

```java
// Instance-publisher

URI uri = URI.create("redis://system-out?param=foo");
PubChannel pubChannel = repository.getPubChannel(uri);
pubChannel.publish("Hello, world!".getBytes());
```

### Channel Service

This module also contains the ChannelService implementations which are used to manage instant
demand-supply interactions between the different parts of the system. These are very similar to the conventional
RPC calls, but it is designed to send demand to all the servers which are listening to the service.

There should be ChannelServiceServers, which supplies the demands in network in prior to the ChannelServiceClient
emitting the demand. Each servers catch all demands from the reserved channel for the service,
and exactly 0 or 1 server should supply the data to the supply channel.

For communication, all servers and clients must have the `ChannelServiceProtocol` which have the information
about the service, and the demand and supply channels.

```java
ChannelServiceProtocol protocol = ChannelServiceProtocol.<String, Long>builder()
.setDemandSerde(JacksonSerde.byClass(objectMapper, String.class))
.setDemandPubChannelURIProvider(demand -> URI.create("redis:char-count:demand"))
.setDemandSubChannelURI(URI.create("redis:char-count:demand"))
.setSupplySerde(JacksonSerde.byClass(objectMapper, Long.class))
.setSupplyChannelURIProvider(demand -> URI.create("redis:char-count:supply:" + demand.hashCode()))
.setRequestTimeout(Duration.ofSeconds(3))
.buildMono();
```

With the protocol and the channel repository, the ChannelServiceClient and ChannelServiceServer can be created.

**Server**

```java
ChannelServiceServer.buildMono(repository, protocol, demand -> demand.length()).listen();
```

**Client**

```java
MonoChannelServiceClient client = ChannelServiceClient.buildMono(repository, protocol);
Long result = client.demand("Hello, World!").block(); // 13
```
10 changes: 10 additions & 0 deletions channel/src/main/java/com/navercorp/pinpoint/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@

/**
* @author youngjin.kim2
*
* Channel is publishable channel, and subscribable channel at the same time.
* If a pair of PubChannel, and SubChannel are bound in a single channel interface, the two channel
* should be able to communicate with each other.
* <br>
* In most cases, A paired PubChannel, and SubChannel are located at the different side of the network, and
* implemented with distributed systems like Redis, Kafka, etc.
*
* @see PubChannel
* @see SubChannel
*/
public interface Channel extends PubChannel, SubChannel {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

/**
* @author youngjin.kim2
*
* ChannelProvider provides PubChannel, and SubChannel by the key.
* <br>
* In distributed system, A pair of PubChannel, and SubChannel are connected each other if they have the same key.
* In the other word, even if the two processes are located at the different side of the network, they can communicate
* with each other if they have the same key.
*/
public interface ChannelProvider extends PubChannelProvider, SubChannelProvider {
static ChannelProvider pair(PubChannelProvider pub, SubChannelProvider sub) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

/**
* @author youngjin.kim2
*
* A pair for registration of ChannelProvider.
*
* @see ChannelProviderRepository
*/
public class ChannelProviderRegistry {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@

/**
* @author youngjin.kim2
*
* The channels are provided by the URI key with the formats like below:
* <br>
* <pre>
* scheme://key
* e.g. redis://hello-world-topic?param1=value1&param2=value2
* e.g. kafka://hello-world-topic?param1=value1&param2=value2
* </pre>
*
* If a pair of PubChannel, and SubChannel are obtained by the same key, they can communicate with each other.
*
* @see ChannelProvider
* @see Channel
*/
public class ChannelProviderRepository {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

/**
* @author youngjin.kim2
*
* PubChannel publishes the byte array parameter into the connected SubChannel.
*
* @see Channel
* @see SubChannel
*/
public interface PubChannel {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

/**
* @author youngjin.kim2
*
* Provides the PubChannel by the key.
*
* @see ChannelProvider
*/
public interface PubChannelProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

/**
* @author youngjin.kim2
*
* SubChannel registers, or de-register the handler for the incoming byte array from PubChannel.
*
* @see Channel
* @see PubChannel
*/
public interface SubChannel {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

/**
* @author youngjin.kim2
*
* Provide the SubChannel by the key.
*
* @see ChannelProvider
*/
public interface SubChannelProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

/**
* @author youngjin.kim2
*
* Handler object for the byte array from SubChannel
*
* @see SubChannel
*/
public interface SubConsumer {

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit fef2e1a

Please sign in to comment.