使用Kafka
来源:jhipster |
时间:2018-11-08
|
|

使用Kafka

特征

Kafka是一种流行的发布 - 订阅消息传递系统。JHipster对Kafka有一个可选的支持,它将:

  • 使用JHipster 配置Spring Cloud Stream

  • application-*.yml文件中添加必要的配置以获取示例topic-jhipster主题,并为Kafka添加健康检查监视器(将在health管理屏幕中提供)。

  • 使用示例topic-jhipster主题生成Docker Compose配置文件,因此只需键入即可使用Kafka docker-compose -f src/main/docker/kafka.yml up -d

  • 在使用Docker时,在微服务环境中为Kafka提供支持。如果一个微服务或一个网关使用Kafka,Docker Compose子生成器将生成特定的Kafka配置。然后,所有微服务和网关将使用该Kafka代理来获取所有消息。代理对于所有应用程序都是通用的,因为它通常用作应用程序之间的消息代理。

在JHipster应用程序中使用Kafka和Spring Cloud Stream的教程

条件

生成新应用程序,并确保Asynchronous messages using Apache Kafka在提示您选择要使用的技术时选择生成Docker Compose配置文件,您可以使用以下命令启动Kafka:

docker-compose -f src/main/docker/kafka.yml up -d

模型

创建一个简单的模型来表示我们将通过Kafka主题发送的消息。

public class Greeting {
    private String message;

    public Greeting() {
    }

    public String getMessage() {
        return message;
    }

    public Greeting setMessage(String message) {
        this.message = message;
        return this;
    }
}

消息频道

Spring Cloud Stream引入了一个名为的抽象层message channels生产者将消息发送到输出通道,消费者订阅消息的输入通道。这使您可以灵活地使用不同的消息传递系统(称为绑定器),而无需编写大量特定于平台的代码。

让我们创建输出和输入通道。

输出通道
public interface ProducerChannel {

    String CHANNEL = "messageChannel";

    @Output
    MessageChannel messageChannel();
}
输入频道
public interface ConsumerChannel {

    String CHANNEL = "subscribableChannel";

    @Input
    SubscribableChannel subscribableChannel();
}

组态

我们需要告诉Spring Cloud Stream我们在Jhipster生成的配置类中的频道。

@EnableBinding(value = {Source.class, ProducerChannel.class, ConsumerChannel.class})
public class MessagingConfiguration {

}

我们还需要配置我们的应用程序以与Kafka交谈。

spring:
    cloud:
      stream:
        bindings:
            messageChannel:
                destination: greetings
                content-type: application/json
            subscribableChannel:
                destination: greetings

这对应于:

spring.cloud.stream.bindings.<channelName>.destination.<topic>

制片人和消费者

制片人资源

让我们创建一个简单的REST端点,我们可以调用它来向Kafka主题发送消息greetings

@RestController
@RequestMapping("/api")
public class ProducerResource{

    private MessageChannel channel;

    public ProducerResource(ProducerChannel channel) {
        this.channel = channel.messageChannel();
    }

    @GetMapping("/greetings/{count}")
    @Timed
    public void produce(@PathVariable int count) {
        while(count > 0) {
            channel.send(MessageBuilder.withPayload(new Greeting().setMessage("Hello world!: " + count)).build());
            count--;
        }
    }

}
消费者服务

我们可以使用StreamListener消息映射和自动类型转换来使用消息。

@Service
public class ConsumerService {

    private final Logger log = LoggerFactory.getLogger(ConsumerService.class);


    @StreamListener(ConsumerChannel.CHANNEL)
    public void consume(Greeting greeting) {
        log.info("Received message: {}.", greeting.getMessage());
    }
}

运行应用程序

允许访问端点SecurityConfiguration.java

.antMatchers("/api/greetings/**").permitAll()

如果调用端点http://localhost:8080/api/greetings/5,则应该看到记录到控制台的消息。

您可以在eosimosu / jhipster-kafka上找到完整的源代码


提交
查看更多评论
没有更多评论