本次主要是對thingsboard的源碼進行一個簡單的分析,從設(shè)備發(fā)送遙測數(shù)據(jù)到平臺的角度跟進后端代碼邏輯探究其流轉(zhuǎn)過程,好了,話不多說,直接進入正題。
創(chuàng)新互聯(lián)專注于江南網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供江南營銷型網(wǎng)站建設(shè),江南網(wǎng)站制作、江南網(wǎng)頁設(shè)計、江南網(wǎng)站官網(wǎng)定制、小程序制作服務(wù),打造江南網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供江南網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。數(shù)據(jù)流轉(zhuǎn)大概流程找到MqttTransportService類==》通過@PostConstruct注解在項目啟動后進入init()方法
==》里面綁定了MqttTransportServerInitializer類即mqtt服務(wù)初始化
==》MqttTransportServerInitializer類繼承ChannelInitializer類重寫了initChannel方法
==》initChannel方法里綁定了MqttTransportHandler
==》進入MqttTransportHandler的channelRead方法,驗證消息類型為mqtt時轉(zhuǎn)入processMqttMsg方法
==》processMqttMsg里進行判斷:消息類型為連接時轉(zhuǎn)入processConnect,設(shè)備session為臨時的轉(zhuǎn)入processProvisionSessionMsg
否則轉(zhuǎn)入enqueueRegularSessionMsg方法,這里先探討轉(zhuǎn)入enqueueRegularSessionMsg
==》轉(zhuǎn)入enqueueRegularSessionMsg后調(diào)用processMsgQueue將消息投遞到隊列
==》跟進去發(fā)現(xiàn)里面調(diào)用了processRegularSessionMsg方法
==》processRegularSessionMsg里根據(jù)消息的類型進行轉(zhuǎn)發(fā),比如:發(fā)布,訂閱,取消訂閱,取消連接等等
==》跟進PUBLISH,轉(zhuǎn)入processPublish方法
==》轉(zhuǎn)入processDevicePublish,進入發(fā)現(xiàn)這里根據(jù)消息的主題進行轉(zhuǎn)發(fā),這里選擇isDeviceTelemetryTopic對應(yīng)的transportService.process接口實現(xiàn)
==》發(fā)現(xiàn)這里對消息封裝了一下之后轉(zhuǎn)入sendToRuleEngine,將消息發(fā)送到規(guī)則鏈
》繼續(xù)跟進進入sendToRuleEngine,發(fā)現(xiàn)調(diào)用ruleEngineMsgProducer.send,即將消息通過生產(chǎn)者發(fā)送到隊列
這里對應(yīng)多個實現(xiàn),例如:inMemory,Kafka,RabbitMQ等等,默認發(fā)送到inMemory內(nèi)存
》有生產(chǎn)者那肯定有消費者,我們找到DefaultTbRuleEngineConsumerService核心消費者
》找到launchMainConsumers方法》launchConsumer》consumerLoop
》發(fā)現(xiàn)consumerLoop是個循環(huán),將消息取出來消費,轉(zhuǎn)submitMessage方法
》然后轉(zhuǎn)入forwardToRuleEngineActor》調(diào)用actorContext.tell,這里開始就是Actor模型流轉(zhuǎn)了,不清楚的可以去百度搜索一下
》首先調(diào)用appActor.tell通過根appActor調(diào)用tell方法轉(zhuǎn)入enqueue方法,里面對消息進行了分類,分為高優(yōu)先級和正常消息隊列
還有initActor()方法創(chuàng)建一系列actor,大概流程:AppActor》TenantActor》RuleChainActor》RuleNodeActor,我們先轉(zhuǎn)入tryProcessQueue方法
==>然后發(fā)現(xiàn)調(diào)用了processMailbox,發(fā)現(xiàn)這里是將之前分類的消息依次取出來然后調(diào)用actor.process(msg)方法依次向下流轉(zhuǎn)處理消息
》ContextAwareActor》process==》doProcess==》…
詳細流轉(zhuǎn)過程我們首先找到位于common模塊下transport模塊下的mqtt下的MqttTransportService類
在init方法中我們可以看到:其實現(xiàn)了bossGroup,workerGroup兩個線程組,綁定端口,綁定Handler相關(guān)的內(nèi)容,這是netty相關(guān)的學(xué)過的應(yīng)該非常熟悉,這里也可以猜想到thingsboard里的mqtt是基于netty封裝實現(xiàn)的。
項目在啟動時,基于spring的@Service注解和 @PostConstruct注解,會進入到我們的init()方法里,然后轉(zhuǎn)入MqttTransportServerInitializer,即mqtt服務(wù)初始化過程。
進入到MqttTransportServerInitializer方法
這里主要是繼承了ChannelInitializer類重寫了initChannel初始化管道方法,
設(shè)置一些解碼,ip過濾相關(guān)的,然后轉(zhuǎn)入MqttTransportHandler具體的邏輯處理。
轉(zhuǎn)入MqttTransportHandler我們直接看到channelRead方法,客戶端在連接mqtt服務(wù)端時會進入到這里
這里我們可以看到,主要對消息做了一個判斷,是否是mqtt類型的消息,是則轉(zhuǎn)入processMqttMsg方法處理mqtt消息。
在這個方法里面我們可以看到,消息類型為連接時轉(zhuǎn)入processConnect方法處理連接,返回相關(guān)ack確認,設(shè)備session為臨時的轉(zhuǎn)入processProvisionSessionMsg,否則轉(zhuǎn)入enqueueRegularSessionMsg處理消息,這里我們主要關(guān)注最后一個。
這里可以看到消息進入時做了一個判斷,消息數(shù)量大于隊列容納消息長度時直接return,這個也可以在application.yml里面配置的。然后直接調(diào)用processMsgQueue方法處理消息到隊列,我們跟進去看看。
這里我們可以看到,根據(jù)消息的類型來做后續(xù)的邏輯處理,例如:發(fā)布,訂閱,取消訂閱,心跳等等。這里我們進入PUBLISH消息上報這里。
這里有判斷,我們直接轉(zhuǎn)入processDevicePublish方法,上面那個是和網(wǎng)關(guān)相關(guān)的我們目前沒涉及到,先不管。
看到processDevicePublish這個方法里面有很多個分支,例如:發(fā)布設(shè)備屬性,設(shè)備遙測數(shù)據(jù),設(shè)備遠程控制等等,我們主要跟進設(shè)備遙測數(shù)據(jù)這里。
這里我們可以看到,對消息進行一下封裝,加入了設(shè)備名稱,設(shè)備類型,發(fā)送的時間戳,然后將消息發(fā)送到規(guī)則鏈,進入sendToRuleEngine方法
可以看到這里指定了后面要發(fā)送到隊列的名稱,默認為Main,從
String defaultQueueName = deviceProfile.getDefaultQueueName();
queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;可以看出在設(shè)備配置里面是可以指定發(fā)送到那個隊列的名稱的
我們繼續(xù)跟進sendToRuleEngine方法
這里可以看到我們的消息是通過ruleEngineMsgProducer生產(chǎn)者進行發(fā)送到隊列的, ruleEngineMsgProducer.send這個接口對應(yīng)了多個實現(xiàn)
有基于內(nèi)存的,Kafka,RabbitMQ等等,默認是基于內(nèi)存的,這個可以在配置文件內(nèi)修改。
有生產(chǎn)者那么對應(yīng)肯定有消費者,此時我們需要轉(zhuǎn)入消費者,看看它后面的邏輯運轉(zhuǎn)是怎樣的。
我們找到DefaultTbRuleEngineConsumerService核心消費類
這里可以明顯地看到消費者依次從隊列里取出消息處理后調(diào)用submitMessage處理后續(xù)邏輯
forwardToRuleEngineActor從這個方法名字我們看到消息將推送到規(guī)則鏈Actor,后面就開始進入Actor模型流轉(zhuǎn)了,這個不清楚的可以百度搜索一下。
首先進入的是AppActor.tell方法,這個是頂級actor,是一切actor創(chuàng)建的源頭。
流轉(zhuǎn)過程:
AppActor==》TenantActor==》RuleChainActor==》RuleNodeActor
我們接著跟進
!](https://img-blog.csdnimg.cn/6d61049e3bc64c8aa8f60262b0868439.png#pic_center)
這里消息進行了分類,創(chuàng)建了兩個存儲消息的隊列,一個高優(yōu)先級隊列,一個正常隊列,根據(jù)消息的類型進行添加,后調(diào)用tryProcessQueue方法進一步處理消息,
initActor()這個對應(yīng)初始化各類的actor,
我們進入tryProcessQueue方法。
可以看到使用了線程池創(chuàng)建的線程去執(zhí)行我們的processMailbox方法
在processMailbox方法里,可以看到,是先從高優(yōu)先級隊列里取消息進行處理,直至其高優(yōu)先級隊列為空時才從正常隊列里取消息進行處理,消息取出后調(diào)用actor.process方法進行消息的處理流轉(zhuǎn),整個過程一直循環(huán)。
跟進actor.process方法,進入ContextAwareActor的process方法,進入doProcess方法,然后流轉(zhuǎn)到各個分類的actor,這里我們選擇一個進行跟進
ruleChainActor
可以看到這里也有很多分支,這里我們選兩個進行分析:首先是:onRuleNodeToSelfMsg,
跟進后發(fā)現(xiàn)調(diào)用tbNode.onMsg方法進入我們的節(jié)點的消息處理方法,點擊我們可以發(fā)現(xiàn)對應(yīng)很多個節(jié)點實現(xiàn),這個對應(yīng)的就是我們前臺web界面規(guī)則鏈里的一個個流轉(zhuǎn)的節(jié)點
我們隨機選擇一個進入CalculateDeltaNode
可以看到每個節(jié)點都有init(),onMsg方法,初始化和對應(yīng)節(jié)點的邏輯實現(xiàn),后續(xù)如果自己想自定義節(jié)點時可以參照著寫。到這里我們基本清楚了整個消息流轉(zhuǎn)的流程,先告一段落了。
再回到之前那個onQueueToRuleEngineMsg
我們選擇onQueueToRuleEngineMsg,對應(yīng)消息流轉(zhuǎn)到規(guī)則鏈節(jié)點。
這幾個也可以跟進去看看,最終也是流轉(zhuǎn)到了規(guī)則鏈節(jié)點里,消息一直在這幾個過程里面循環(huán),就先講到這里了,后面有時間再繼續(xù)研究。
thingsboard的源碼還是比較復(fù)雜一點的,其中有很多的分支流轉(zhuǎn),循環(huán)嵌套,而且很多地方使用的都是異步操作,線程池操作,斷點分析有時也不容易,不過不管是代碼規(guī)范還是邏輯上還是比較嚴(yán)謹(jǐn)?shù)?,耐心一點還是比較容易理解清楚整個流程的。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
當(dāng)前文章:thingsboard源碼分析--從設(shè)備數(shù)據(jù)上報的角度-創(chuàng)新互聯(lián)
網(wǎng)站網(wǎng)址:http://vcdvsql.cn/article44/dsodhe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設(shè)、外貿(mào)建站、面包屑導(dǎo)航、品牌網(wǎng)站建設(shè)、軟件開發(fā)、小程序開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容