基于JMS的數據匯集系統的研究與實現
摘要 在深入研究JMS異步消息處理機制的基礎上,利用JMS為分布在不同系統中的氣象數據的匯集提供了一個可行的方案,解開了應用程序間的耦合,并增強了應用程序的可擴展性和可靠性。
關鍵詞 消息、JMS、點對點、發布/訂閱、數據匯集、氣象
現如今,越來越多的企業、單位面臨著各種各樣的數據集成和系統整合,CORBA、DCOM、RMI等RPC中間件技術也應運而生,但由于采用RPC同步處理技術,在性能、健壯性、可擴展性上都存在著諸多缺點。而基于消息的異步處理模型采用非阻塞的調用特性,發送者將消息發送給消息服務器,消息服務器在合適的時候再將消息轉發給接收者;發送和接收是異步的,發送者無需等待,二者的生命周期也可以不必相同,而且發送者可以將消息間接傳給多個接收者,大大提高了程序的性能、可擴展性及健壯性,這使得異步處理模型在分布式應用上比起同步處理模型更具有吸引力。
本文詳細介紹了Java中的異步處理機制――基于JMS的異步消息處理技術,并結合實例討論了基于JMS的氣象系統數據匯集系統的設計與實現。
Java消息服務-JMS
JMS是由包括Sun Microsystems和IBM等在內的幾個公司合作設計的一個面向消息的中間件(MOM)API。JMS定義了Java 中訪問消息中間件的接口,但JMS 只是接口,并沒有給予實現,實現JMS 接口的消息中間件稱為JMS提供者(JMS Provider)。JMS的目的是應用程序能在異步情況下可靠地傳輸和接受消息。從編程的角度來看,JMS可以被當作一個容器管理的資源,與JDBC連接類似。正如你可以通過JDBC去訪問許多不同的關系數據庫一樣,你可以通過JMS訪問獨立于廠商的消息服務系統。
JMS主要包括三個部分――JMS提供者(JMS Provider)、消息發送者(Message Producer)和消息接收者(Message Consumer)。簡單的說,JMS提供者是指實現JMS API接口的消息系統,是消息的中轉站;消息發送者是指消息的創造和發送者,是消息的發源地;消息接收者是指接收消息的應用程序,是消息最終的目的地。
JMS消息是異步處理的,消息發送者可以發送一個消息而無須等待響應。消息發送者將消息發送到一條虛擬的通道(主題或隊列)上,消息接收者則訂閱或是監聽該通道。一條消息可能最終轉發給一個或多個消息接收者,這些接收者都無需對消息發送者做出回應。
1、JMS消息的組成
在J2EE1.4后,JMS API不再區分在點對點域和發布/訂閱域中創建的消息。JMS消息由以下三部分組成(如圖1.1所示):
消息頭(header)――JMS消息頭包含了許多字段,它們是消息發送后由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,并且為消息確定路由。
屬性(property)――由消息發送者產生,用來添加刪除消息頭以外的附加信息。
消息體(body)――由消息發送者產生,JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

2、JMS消息傳遞模式
JMS支持兩種消息傳遞模式――點對點模式(P2P)和發布/訂閱模式(Publish/Subscribe)。這兩種都是人們熟知的push模式,消息的發送者是活動的發起人,而接收者則是被動的接收消息。在JMS中,這些消息傳遞模式被稱為消息域(message domain)。
2.1 JMS點對點消息域
在點對點模式中,發送者和接收者對消息傳送的目的地址達成一致,即所謂的隊列(queue)。消息隊列位于JMS提供者中,消息發送者向一個消息隊列發送消息,消息接收者可以在消息發送后的任何時刻從這個隊列中(被動地)接收消息,在接收者確認之前消息一直保存在消息隊列中直到過期。點對點模式的結構圖如圖1.2所示:
JMS點對點消息域具有以下特點:
·每條消息能而且只能被一個接收者接收。
·每條消息或者被接收者從隊列中取走,或者被 JMS提供者在超時的情況下刪除。
·消息產生的時候接收者不一定要存在,接收者可以在消息產生后的任何時間里取走消息。
·接收者不能請求一個消息。
·接收者必須在收到消息后發出確認信息。

2.2 JMS發布/訂閱消息域
在發布/訂閱模式下,發送者被稱為發布者(publisher),一個消息可已有很多接收者,這些接收者被稱為訂閱者(subscriber)。發布/訂閱模式采用與點對點模式完全不同的消息發送模式。在發布/訂閱模式下,發布者給一個主題(topic)發送消息,多個訂閱者在訂閱的時候可以訂閱他們感興趣的主題。一個主題可以被多個訂閱者訂閱,一個訂閱者也可以訂閱多個主題。一個主題的消息只被發給該主題的所有訂閱者。訂閱者只能接收它訂閱的主題中的消息,并且,在默認情況下,訂閱者在消息發送的時候必須是活動的,并隨時準備接收消息,否則它將錯過該消息。為了避免這種時間依賴性JMS API允許訂閱者創建持久訂閱。發布/訂閱模式的結構圖如圖1.3所示:
JMS發布/訂閱消息域具有以下特點:
·每一條消息由一個發布者創建而由0個或多個訂閱者接收它。
·消息立刻被分發給現有的訂閱者。
·訂閱者必須在消息發送的時候存在以接收消息。
·持久訂閱允許訂閱者接收它處于非活動狀態時由發布者向主題發送的消息。
·訂閱者必須在接收到消息后發出確認信息。

系統的設計與實現
目前,基層氣象臺站的每個分系統都是獨立的,各自的數據都保存在本機上,彼此互不聯系,而匯集系統就是為了把分散在每個系統上的數據匯集到一個統一的數據庫中。為了實現這個目的可以有多種方法。
(1)由各個分系統解析本機的數據,然后直接向數據庫中寫記錄,但是這樣勢必對統一數據庫的安全性造成影響,而且各分系統的計算機莨莠不齊,老舊的機器能不能提供更多的資源也是一個問題;
(2)由服務器上的一個程序統一從各個系統上獲取數據,然后寫入數據庫,但是這樣該服務器程序與各個分系統便形成了緊耦合,一旦分系統發生變化,或者要擴展一個分系統便要重新更改程序代碼,使得整個系統的穩定性和可擴展性受到很大影響;
而JMS消息機制的異步傳輸模型恰好能完美的解決以上兩種方法存在的問題。
1、系統的設計
基于JMS的氣象數據匯集系統由五個模塊組成:代理模塊、消息發送模塊(MessageSender)、消息接收模塊(MessageReceiver)、數據格式轉換模塊(DataFormatConverser)和統一數據庫訪問接口(UDAI)。系統結構圖如下圖所示:

基于JMS的氣象數據匯集系統結構圖
(1)代理模塊
代理模塊是運行于分系統上的Java程序,它只有兩個功能:監測到有新數據到,立即將數據文件復制到服務器上的文件緩沖區,然后調用消息發送模塊,并將數據文件的文件名,文件大小,生成時間,業務屬性等信息傳遞給消息發送模塊。由于代理程序不需要解析數據文件,也不需要寫數據庫,降低了對系統資源的消耗,即使在比較老舊的機器上運行也不會出現問題。而且一旦消息發送成功,后面的工作便與分系統無關,這樣也就解除了分系統和服務器之間的耦合。
(2)消息發送模塊
消息發送模塊作為一個會話Bean部署在服務器上,它的功能主要是:將代理模塊傳來的數據文件的文件名,文件大小,生成時間,業務屬性等信息組裝成消息,然后發送到與其業務屬性相對應的消息隊列中(每一類業務屬性對應一個消息隊列)。消息發送模塊運行與服務器上的Bean容器中,不占用分系統資源,減小了分系統上的資源開銷;而且該模塊被部署為一個Bean,方便了以后分系統的擴展和變化。
(3)消息接收模塊
消息接收模塊是一個運行于服務器端的JMS客戶端,每一個消息隊列都對應一個消息接收模塊。消息接收模塊通過注冊一個MessageListener接口,監聽消息隊列上的消息;接收到消息后便調用數據格式轉換模塊,將消息內容和數據文件在文件緩沖區的絕對地址作為參數傳給數據格式轉換模塊。之所以為每一個消息隊列對應一個消息接收模塊,是考慮到以后系統的擴展,如果要增加一個隊列,只需增加一個接收模塊,并不影響原來系統的正常運行。
(4)數據格式轉換模塊
數據格式轉換模塊的主要功能是:解析數據文件并按照一定的格式生成標準的XML文檔供統一數據庫訪問接口使用。
(5)統一數據庫訪問接口
統一數據庫訪問接口提供一個統一的數據庫寫入接口,不管以后系統如何變化,只要生成標準的XML文檔便可以使用此接口來訪問數據庫。
系統中真正與JMS相關的模塊是消息發送模塊和消息接收模塊,由于每一類業務屬性對應一個消息隊列,每一個消息隊列對應一個接收模塊,所以選擇PTP的傳遞模式作為系統的消息傳遞模式。
2、系統的實現
(1)建立消息隊列
系統使用JBoss作為J2EE服務器,通過編輯配置文件jbossMQ-destinations-service.xml來定義應用所需的消息隊列地址信息。其部分代碼如下:
(2)消息發送模塊的實現
MessageSender SessionBean的功能是構造消息并向指定的消息隊列發送消息,其部分代碼如下:
(3)消息接收模塊的實現
消息接收模塊通過注冊MessageListener接口來監聽消息隊列,實現其onMessage()方法處理消息。其部分代碼如下:
結束語
本系統應用JMS異步消息作為分系統與服務器的關聯機制,分系統只需要通過消息向服務器提交任務,之后便不需要關心數據如何處理,何時處理,徹底解開了分系統與服務其的耦合,即保證了數據庫的安全可靠,也減少了分系統的資源消耗。而且系統的各個模塊都是相對獨立,對于今后的變化和進一步的擴展提供了更大的空間。
關鍵詞 消息、JMS、點對點、發布/訂閱、數據匯集、氣象
現如今,越來越多的企業、單位面臨著各種各樣的數據集成和系統整合,CORBA、DCOM、RMI等RPC中間件技術也應運而生,但由于采用RPC同步處理技術,在性能、健壯性、可擴展性上都存在著諸多缺點。而基于消息的異步處理模型采用非阻塞的調用特性,發送者將消息發送給消息服務器,消息服務器在合適的時候再將消息轉發給接收者;發送和接收是異步的,發送者無需等待,二者的生命周期也可以不必相同,而且發送者可以將消息間接傳給多個接收者,大大提高了程序的性能、可擴展性及健壯性,這使得異步處理模型在分布式應用上比起同步處理模型更具有吸引力。
本文詳細介紹了Java中的異步處理機制――基于JMS的異步消息處理技術,并結合實例討論了基于JMS的氣象系統數據匯集系統的設計與實現。
Java消息服務-JMS
JMS是由包括Sun Microsystems和IBM等在內的幾個公司合作設計的一個面向消息的中間件(MOM)API。JMS定義了Java 中訪問消息中間件的接口,但JMS 只是接口,并沒有給予實現,實現JMS 接口的消息中間件稱為JMS提供者(JMS Provider)。JMS的目的是應用程序能在異步情況下可靠地傳輸和接受消息。從編程的角度來看,JMS可以被當作一個容器管理的資源,與JDBC連接類似。正如你可以通過JDBC去訪問許多不同的關系數據庫一樣,你可以通過JMS訪問獨立于廠商的消息服務系統。
JMS主要包括三個部分――JMS提供者(JMS Provider)、消息發送者(Message Producer)和消息接收者(Message Consumer)。簡單的說,JMS提供者是指實現JMS API接口的消息系統,是消息的中轉站;消息發送者是指消息的創造和發送者,是消息的發源地;消息接收者是指接收消息的應用程序,是消息最終的目的地。
JMS消息是異步處理的,消息發送者可以發送一個消息而無須等待響應。消息發送者將消息發送到一條虛擬的通道(主題或隊列)上,消息接收者則訂閱或是監聽該通道。一條消息可能最終轉發給一個或多個消息接收者,這些接收者都無需對消息發送者做出回應。
1、JMS消息的組成
在J2EE1.4后,JMS API不再區分在點對點域和發布/訂閱域中創建的消息。JMS消息由以下三部分組成(如圖1.1所示):
消息頭(header)――JMS消息頭包含了許多字段,它們是消息發送后由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,并且為消息確定路由。
屬性(property)――由消息發送者產生,用來添加刪除消息頭以外的附加信息。
消息體(body)――由消息發送者產生,JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

2、JMS消息傳遞模式
JMS支持兩種消息傳遞模式――點對點模式(P2P)和發布/訂閱模式(Publish/Subscribe)。這兩種都是人們熟知的push模式,消息的發送者是活動的發起人,而接收者則是被動的接收消息。在JMS中,這些消息傳遞模式被稱為消息域(message domain)。
2.1 JMS點對點消息域
在點對點模式中,發送者和接收者對消息傳送的目的地址達成一致,即所謂的隊列(queue)。消息隊列位于JMS提供者中,消息發送者向一個消息隊列發送消息,消息接收者可以在消息發送后的任何時刻從這個隊列中(被動地)接收消息,在接收者確認之前消息一直保存在消息隊列中直到過期。點對點模式的結構圖如圖1.2所示:
JMS點對點消息域具有以下特點:
·每條消息能而且只能被一個接收者接收。
·每條消息或者被接收者從隊列中取走,或者被 JMS提供者在超時的情況下刪除。
·消息產生的時候接收者不一定要存在,接收者可以在消息產生后的任何時間里取走消息。
·接收者不能請求一個消息。
·接收者必須在收到消息后發出確認信息。

2.2 JMS發布/訂閱消息域
在發布/訂閱模式下,發送者被稱為發布者(publisher),一個消息可已有很多接收者,這些接收者被稱為訂閱者(subscriber)。發布/訂閱模式采用與點對點模式完全不同的消息發送模式。在發布/訂閱模式下,發布者給一個主題(topic)發送消息,多個訂閱者在訂閱的時候可以訂閱他們感興趣的主題。一個主題可以被多個訂閱者訂閱,一個訂閱者也可以訂閱多個主題。一個主題的消息只被發給該主題的所有訂閱者。訂閱者只能接收它訂閱的主題中的消息,并且,在默認情況下,訂閱者在消息發送的時候必須是活動的,并隨時準備接收消息,否則它將錯過該消息。為了避免這種時間依賴性JMS API允許訂閱者創建持久訂閱。發布/訂閱模式的結構圖如圖1.3所示:
JMS發布/訂閱消息域具有以下特點:
·每一條消息由一個發布者創建而由0個或多個訂閱者接收它。
·消息立刻被分發給現有的訂閱者。
·訂閱者必須在消息發送的時候存在以接收消息。
·持久訂閱允許訂閱者接收它處于非活動狀態時由發布者向主題發送的消息。
·訂閱者必須在接收到消息后發出確認信息。

系統的設計與實現
目前,基層氣象臺站的每個分系統都是獨立的,各自的數據都保存在本機上,彼此互不聯系,而匯集系統就是為了把分散在每個系統上的數據匯集到一個統一的數據庫中。為了實現這個目的可以有多種方法。
(1)由各個分系統解析本機的數據,然后直接向數據庫中寫記錄,但是這樣勢必對統一數據庫的安全性造成影響,而且各分系統的計算機莨莠不齊,老舊的機器能不能提供更多的資源也是一個問題;
(2)由服務器上的一個程序統一從各個系統上獲取數據,然后寫入數據庫,但是這樣該服務器程序與各個分系統便形成了緊耦合,一旦分系統發生變化,或者要擴展一個分系統便要重新更改程序代碼,使得整個系統的穩定性和可擴展性受到很大影響;
而JMS消息機制的異步傳輸模型恰好能完美的解決以上兩種方法存在的問題。
1、系統的設計
基于JMS的氣象數據匯集系統由五個模塊組成:代理模塊、消息發送模塊(MessageSender)、消息接收模塊(MessageReceiver)、數據格式轉換模塊(DataFormatConverser)和統一數據庫訪問接口(UDAI)。系統結構圖如下圖所示:

基于JMS的氣象數據匯集系統結構圖
(1)代理模塊
代理模塊是運行于分系統上的Java程序,它只有兩個功能:監測到有新數據到,立即將數據文件復制到服務器上的文件緩沖區,然后調用消息發送模塊,并將數據文件的文件名,文件大小,生成時間,業務屬性等信息傳遞給消息發送模塊。由于代理程序不需要解析數據文件,也不需要寫數據庫,降低了對系統資源的消耗,即使在比較老舊的機器上運行也不會出現問題。而且一旦消息發送成功,后面的工作便與分系統無關,這樣也就解除了分系統和服務器之間的耦合。
(2)消息發送模塊
消息發送模塊作為一個會話Bean部署在服務器上,它的功能主要是:將代理模塊傳來的數據文件的文件名,文件大小,生成時間,業務屬性等信息組裝成消息,然后發送到與其業務屬性相對應的消息隊列中(每一類業務屬性對應一個消息隊列)。消息發送模塊運行與服務器上的Bean容器中,不占用分系統資源,減小了分系統上的資源開銷;而且該模塊被部署為一個Bean,方便了以后分系統的擴展和變化。
(3)消息接收模塊
消息接收模塊是一個運行于服務器端的JMS客戶端,每一個消息隊列都對應一個消息接收模塊。消息接收模塊通過注冊一個MessageListener接口,監聽消息隊列上的消息;接收到消息后便調用數據格式轉換模塊,將消息內容和數據文件在文件緩沖區的絕對地址作為參數傳給數據格式轉換模塊。之所以為每一個消息隊列對應一個消息接收模塊,是考慮到以后系統的擴展,如果要增加一個隊列,只需增加一個接收模塊,并不影響原來系統的正常運行。
(4)數據格式轉換模塊
數據格式轉換模塊的主要功能是:解析數據文件并按照一定的格式生成標準的XML文檔供統一數據庫訪問接口使用。
(5)統一數據庫訪問接口
統一數據庫訪問接口提供一個統一的數據庫寫入接口,不管以后系統如何變化,只要生成標準的XML文檔便可以使用此接口來訪問數據庫。
系統中真正與JMS相關的模塊是消息發送模塊和消息接收模塊,由于每一類業務屬性對應一個消息隊列,每一個消息隊列對應一個接收模塊,所以選擇PTP的傳遞模式作為系統的消息傳遞模式。
2、系統的實現
(1)建立消息隊列
系統使用JBoss作為J2EE服務器,通過編輯配置文件jbossMQ-destinations-service.xml來定義應用所需的消息隊列地址信息。其部分代碼如下:
<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=RadarQueue"> <depends optional-attribute-name="DestinationManager"> jboss.mq:service=DestinationManager</depends> <depends optional-attribute-name="SecurityManager"> jboss.mq:service=SecurityManager</depends> <attribute name="SecurityConf"> <security> <role name="guest" read="true" write="true"/> <role name="publisher" read="true" write="true" create="false"/> <role name="noacc" read="false" write="false" create="false"/> </security> </attribute> </mbean> |
(2)消息發送模塊的實現
MessageSender SessionBean的功能是構造消息并向指定的消息隊列發送消息,其部分代碼如下:
//構造TEXT消息 message.setText(dataInfo);//設置消息體 //發送消息 Context context = new InitiaContext();//獲取上下文環境 ConnectionFactory connectionFactory = new (ConnectionFactory)context.lookup(“java:comp/env/JNDIConnectionFactory”); Connection connection = connectionFactory.createConnection(); Session session =connection.createSession(false,AUTO_ACKNOWLEDGE); Destination messageQueue = (Destination)context.lookup(“java:comp/env/ RadarQueue”); MessageProducer producer = session.createProducer(messageQueue); producer.send(message); …… |
(3)消息接收模塊的實現
消息接收模塊通過注冊MessageListener接口來監聽消息隊列,實現其onMessage()方法處理消息。其部分代碼如下:
Public class MessageReceiver implements MessageListener{ …… public void onMessage(Message message){ …… dataInfo = ((TextMessage)message).getText();//獲得數據文件信息 …… //調用數據格式轉換模塊 …… } |
結束語
本系統應用JMS異步消息作為分系統與服務器的關聯機制,分系統只需要通過消息向服務器提交任務,之后便不需要關心數據如何處理,何時處理,徹底解開了分系統與服務其的耦合,即保證了數據庫的安全可靠,也減少了分系統的資源消耗。而且系統的各個模塊都是相對獨立,對于今后的變化和進一步的擴展提供了更大的空間。