小編給大家分享一下Spring Cloud Stream怎么實(shí)現(xiàn)服務(wù)之間的通訊,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
創(chuàng)新互聯(lián)建站是專業(yè)的博山網(wǎng)站建設(shè)公司,博山接單;提供網(wǎng)站制作、成都網(wǎng)站制作,網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行博山網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!
Spring Cloud Stream
Srping cloud Bus的底層實(shí)現(xiàn)就是Spring Cloud Stream,Spring Cloud Stream的目的是用于構(gòu)建基于消息驅(qū)動(或事件驅(qū)動)的微服務(wù)架構(gòu)。Spring Cloud Stream本身對Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進(jìn)行封裝(整合)和擴(kuò)展,下面我們實(shí)現(xiàn)兩個服務(wù)之間的通訊來演示Spring Cloud Stream的使用方法。
整體概述
服務(wù)要想與其他服務(wù)通訊要定義通道,一般會定義輸出通道和輸入通道,輸出通道用于發(fā)送消息,輸入通道用于接收消息,每個通道都會有個名字(輸入和輸出只是通道類型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會報錯(輸入通道和輸出通道不同類型的通道名稱也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的復(fù)雜性和不一致性,綁定器會用通道的名字在消息中間件中定義主題,一個主題內(nèi)的消息生產(chǎn)者來自多個服務(wù),一個主題內(nèi)消息的消費(fèi)者也是多個服務(wù),也就是說消息的發(fā)布和消費(fèi)是通過主題進(jìn)行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實(shí)現(xiàn),在Kafka中主題使用Topic實(shí)現(xiàn)。
準(zhǔn)備環(huán)境
創(chuàng)建兩個項(xiàng)目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實(shí)現(xiàn)通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實(shí)現(xiàn)通訊。
兩個項(xiàng)目的POM文件依賴都是:
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
spring-cloud-stream-binder-rabbit是指綁定器的實(shí)現(xiàn)使用RabbitMQ。
項(xiàng)目配置內(nèi)容application.properties:
spring.application.name=spring-cloud-stream-a server.port=9010 #設(shè)置默認(rèn)綁定器 spring.cloud.stream.defaultBinder = rabbit spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b server.port=9011 #設(shè)置默認(rèn)綁定器 spring.cloud.stream.defaultBinder = rabbit spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
啟動一個rabbitmq:
docker pull rabbitmq:3-management docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
編寫A項(xiàng)目代碼
在A項(xiàng)目中定義一個輸入通道一個輸出通道,定義通道在接口中使用@Input和@Output注解定義,程序啟動的時候Spring Cloud Stream會根據(jù)接口定義將實(shí)現(xiàn)類自動注入(Spring Cloud Stream自動實(shí)現(xiàn)該接口不需要寫代碼)。
A服務(wù)輸入通道,通道名稱ChatExchanges.A.Input,接口定義輸入通道必須返回SubscribableChannel:
public interface ChatInput { String INPUT = "ChatExchanges.A.Input"; @Input(ChatInput.INPUT) SubscribableChannel input(); }
A服務(wù)輸出通道,通道名稱ChatExchanges.A.Output,輸出通道必須返回MessageChannel:
public interface ChatOutput { String OUTPUT = "ChatExchanges.A.Output"; @Output(ChatOutput.OUTPUT) MessageChannel output(); }
定義消息實(shí)體類:
public class ChatMessage implements Serializable { private String name; private String message; private Date chatDate; //沒有無參數(shù)的構(gòu)造函數(shù)并行化會出錯 private ChatMessage(){} public ChatMessage(String name,String message,Date chatDate){ this.name = name; this.message = message; this.chatDate = chatDate; } public String getName(){ return this.name; } public String getMessage(){ return this.message; } public Date getChatDate() { return this.chatDate; } public String ShowMessage(){ return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message); } }
在業(yè)務(wù)處理類上用@EnableBinding注解綁定輸入通道和輸出通道,這個綁定動作其實(shí)就是創(chuàng)建并注冊輸入和輸出通道的實(shí)現(xiàn)類到Bean中,所以可以直接是使用@Autowired進(jìn)行注入使用,另外消息的串行化默認(rèn)使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解進(jìn)行指定通道消息的監(jiān)聽:
//ChatInput.class的輸入通道不在這里綁定,監(jiān)聽到數(shù)據(jù)會找不到AClient類的引用。 //Input和Output通道定義的名字不能一樣,否則程序啟動會拋異常。 @EnableBinding({ChatOutput.class,ChatInput.class}) public class AClient { private static Logger logger = LoggerFactory.getLogger(AClient.class); @Autowired private ChatOutput chatOutput; //StreamListener自帶了Json轉(zhuǎn)對象的能力,收到B的消息打印并回復(fù)B一個新的消息。 @StreamListener(ChatInput.INPUT) public void PrintInput(ChatMessage message) { logger.info(message.ShowMessage()); ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date()); chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build()); } }
到此A項(xiàng)目代碼編寫完成。
編寫B(tài)項(xiàng)目代碼
B項(xiàng)目使用Spring Integration實(shí)現(xiàn)消息的發(fā)布和消費(fèi),定義通道時我們要交換輸入通道和輸出通道的名稱:
public interface ChatProcessor { String OUTPUT = "ChatExchanges.A.Input"; String INPUT = "ChatExchanges.A.Output"; @Input(ChatProcessor.INPUT) SubscribableChannel input(); @Output(ChatProcessor.OUTPUT) MessageChannel output(); }
消息實(shí)體類:
public class ChatMessage { private String name; private String message; private Date chatDate; //沒有無參數(shù)的構(gòu)造函數(shù)并行化會出錯 private ChatMessage(){} public ChatMessage(String name,String message,Date chatDate){ this.name = name; this.message = message; this.chatDate = chatDate; } public String getName(){ return this.name; } public String getMessage(){ return this.message; } public Date getChatDate() { return this.chatDate; } public String ShowMessage(){ return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message); } }
業(yè)務(wù)處理類用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解發(fā)布消息:
@EnableBinding(ChatProcessor.class) public class BClient { private static Logger logger = LoggerFactory.getLogger(BClient.class); //@ServiceActivator沒有Json轉(zhuǎn)對象的能力需要借助@Transformer注解 @ServiceActivator(inputChannel=ChatProcessor.INPUT) public void PrintInput(ChatMessage message) { logger.info(message.ShowMessage()); } @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT) public ChatMessage transform(String message) throws Exception{ ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(message,ChatMessage.class); } //每秒發(fā)出一個消息給A @Bean @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000")) public GenericMessage<ChatMessage> SendChatMessage(){ ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date()); GenericMessage<ChatMessage> gm = new GenericMessage<>(message); return gm; } }
運(yùn)行程序
啟動A項(xiàng)目和B項(xiàng)目:
看完了這篇文章,相信你對“Spring Cloud Stream怎么實(shí)現(xiàn)服務(wù)之間的通訊”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
網(wǎng)站標(biāo)題:SpringCloudStream怎么實(shí)現(xiàn)服務(wù)之間的通訊
當(dāng)前URL:http://vcdvsql.cn/article16/pdccgg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供軟件開發(fā)、App設(shè)計(jì)、網(wǎng)站導(dǎo)航、微信小程序、電子商務(wù)、用戶體驗(yàn)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)