1、BusServer每隔一段时间,就会产生一个TimeInfo的消息
package com.neohope.springcloud.test.busserver; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.boot.context.annotation.DeterminableImports; @Configuration @ComponentScan @EnableAutoConfiguration @EnableBinding(Source.class) public class AppSource { private static Logger logger = LoggerFactory.getLogger(AppSource.class); public static void main(String[] args) { SpringApplication.run(AppSource.class, args); } @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")) public MessageSource<TimeInfo> timerMessageSource() { return new MessageSource<TimeInfo>() { @Override public Message<TimeInfo> receive() { logger.info("Msg Sent"); return MessageBuilder.withPayload(new TimeInfo(new Date().getTime()+"","hiup")).build(); } }; } public static class TimeInfo{ private String time; private String label; public TimeInfo(String time, String label) { super(); this.time = time; this.label = label; } public String getTime() { return time; } public String getLabel() { return label; } } }
2、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>app-starters-build</artifactId> <version>1.1.3.M1</version> <relativePath /> </parent> <groupId>com.neohope.springcloud.test</groupId> <artifactId>busserver</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>busserver</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>4.3.6.RELEASE</version> </dependency> <!--dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> </project>
3、application.properties
server.port=-1 spring.cloud.stream.bindings.output.destination=timerTopic spring.cloud.stream.bindings.output.content-type=application/json spring.cloud.stream.kafka.binder.zkNodes=localhost #spring.cloud.stream.kafka.binder.brokers=localhost
4、BusClient会收到对应的消息并输出日志
package com.neohope.springcloud.test.busclient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.boot.SpringApplication; @Configuration @ComponentScan @EnableAutoConfiguration @EnableBinding(Sink.class) public class AppSink { private static Logger logger = LoggerFactory.getLogger(AppSink.class); public static void main(String[] args) { SpringApplication.run(AppSink.class, args); } @StreamListener(Sink.INPUT) public void loggerSink(SinkTimeInfo sinkTimeInfo) { logger.info("Received: " + sinkTimeInfo.toString()); } public static class SinkTimeInfo{ private String time; private String label; public String getTime() { return time; } public void setTime(String time) { this.time = time; } public void setSinkLabel(String label) { this.label = label; } public String getLabel() { return label; } @Override public String toString() { return "SinkTimeInfo [time=" + time + ", label=" + label + "]"; } } }
5、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>app-starters-build</artifactId> <version>1.1.3.M1</version> <relativePath /> </parent> <groupId>com.neohope.springcloud.test</groupId> <artifactId>busclient</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>busclient</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>4.3.6.RELEASE</version> </dependency> <!--dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <version>1.5.1.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> </project>
6、application.properties
server.port=-1 spring.cloud.stream.bindings.input.destination=timerTopic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=timerGroup spring.cloud.stream.binder.kafka.resetOffsets=true spring.cloud.stream.kafka.binder.zkNodes=localhost #spring.cloud.stream.kafka.binder.brokers=localhost