top
Loading...
分布式對話服務器的管理

摘要:

通過使用JDK 1.3中引入的RMI和Proxy API,本篇文章討論了一種允許一臺或多臺servlet服務器在一臺或多臺對話服務器上維護對話信息的技術,采用這種技術后,單一點故障就不會再出現了。

如果系統中有一臺或多臺servlet服務器,對話信息只存在于運行著JVM的一臺servlet服務器上,而不會被傳輸給其他servlet服務器。如果該servlet服務器當機或因為維護而被關機,任何保存在對話中的信息都會丟失。如果一個系統中有多臺servlet服務器,一個帶有對話的用戶需要訪問對話中的任何信息,都需要被重新定向到同一臺servlet服務器。曾經有專家建議采用關系數據庫保存所有的對話信息,但這仍然存在單一點故障的危險,那就是運行關系數據庫的服務器。而且如果數據庫出了故障,所有的servlet服務器就都不能再訪問對話信息了。另外,在數據庫中保存可串行化的對象在有些數據庫中是比較難以實現的。

多服務器對話管理的另一個可能的途徑是利用JavaSpaces API來維護對話對象中的記錄。當然,如果運行JavaSpaces的服務器由于維護或故障而被關機,也會丟失所有的對話信息,我們再一次遇到了單一點故障的問題。

要實現帶有N個節點的分布式對話服務器,我們必須解決如下的三個問題:

━━如何建立一個庫來存貯對話信息。

━━如何對分布式對話信息存貯庫進行同步。

━━在一個對話信息存貯庫脫離網絡后,如何使該服務器從下一個對話信息存貯庫中訪問對話信息。


Mnemosyne的簡介

我們用來存貯對話信息的庫是Mnemosyne界面的執行。執行Mnemosyne的對象負責管理對話信息存貯庫中的所有對象,任何試圖寫、訪問或刪除庫中對象的對象都必須調用Mnemosyne的相應的方法來實現相應的操作。

一個對象要存貯在Mnemosyne,就必須執行Memory界面,該界面定義了equalsMemory()操作來探測二個內存對象是否相同,這就使 Mnemosyne判斷出應當把哪個對象返回給read要求或take要求。Memory界面也可以進行串行化擴充,以便我們可以用RMI在網絡上傳輸該對象。

Mnemosyne使用三種界面表達其狀態。

1、CommonContext界面存貯Mnemosyne的全部信息。每個Mnemosyne都有一個CommonContext對象的實例,以便read、write、take Memory對象時在各個方法之間進行同步。在write或者take說Memory對象時,CommonContext對象既定義“silent”方法也定義“loud”方法,當在不進行事件通知的情況下添加對象時,就會用到“silent”方法。例如,當Mnemosyne對象接收到WriteRemoteEvent(向一個遠程Mnemosyne對象寫對象的告示。)事件后,如果它也希望向CommonContext對象寫另一個對象,它就無需通知其他的遠程CommonContext對象,最初的Mnemosyne已經通知了它們,因此這種寫是通過調用CommonContext對象的silentWrite()方法“靜悄悄”地完成的。而“loud”方法則在有對象第一次被添加時將這一事件的詳細信息通知所有被調用的監聽對象。

2、Transaction對象用于在read、write、take Memory對象時進行分布式事務處理,這意味在Mnemosyne對象上可能會有多步驟的操作。

3、TransactionContext界面管理分布式事務,使得系統可以中止或提交一個事務。

保持Mnemosynes的同步是通過synchronize()和notify()這二個由 Mnemosyne定義的方法完成的。synchronize()可以使一個本地的Mnemosyne與其他Mnemosyne的Vector保持同步(這些Mnemosyne可以是本地的或遠程的,為簡明起見,我們假定它們是遠程的。)下面是synchronize()的一個例子:


public void synchronize(Vector Mnemosynes)
throws RemoteException, TransactionException
{

file:// MatchAllMemory對象是一個有效類
file://對任何二個Memory對象進行比較
MatchAllMemory matchAllMemory = new MatchAllMemory();

file:// 從Primary中獲得所有Memory
Mnemosyne Mnemosyne = (Mnemosyne) Mnemosynes.firstElement();
Vector allMemories = Mnemosyne.readAll(matchAllMemory,null);


commonContext.silentWriteAll(allMemories);

// 注冊進行發送、接收事件
Enumeration enumeration = Mnemosynes.elements();
while(enumeration.hasMoreElements())
{
Mnemosyne nextMnemosyne = (Mnemosyne) enumeration.nextElement();

file://注冊接收通知
nextMnemosyne.addTakeRemoteEventListener(this, matchAllMemory);
nextMnemosyne.addWriteRemoteEventListener(this, matchAllMemory);

file:// 注冊發送通知
addTakeRemoteEventListener(nextMnemosyne, matchAllMemory);
addWriteRemoteEventListener(nextMnemosyne, matchAllMemory);
}

// ...
}


本地的Mnemosyne對象讀取Vector中第一個Mnemosyne對象的所有Memory對象,并采用“silent”方法將它們寫到其CommonContext對象中。然后,本地Mnemosyne將自己作為TakeRemoteEventListener和WriteRemoteListener添加到所有的遠程Mnemosyne中,這就意味著任何對遠程Mnemosynes的take或read操作都將調用本地Mnemosyne的notify()方法。最后,本地Mnemosyne將遠程Mnemosyne添加到其TakeRemoteEventListeners和WriteRemoteListeners隊列中,確保對本地Mnemosyne的write或take操作都會通知遠程Mnemosyne。

當添加或刪除一個Memory對象時,經過同步的本地Mnemosyne對象需要對所有的Mnemosyne進行更新,可以通過notify()方法來完成這一任務。無論是發生write或take事件,Mnemosyne都會針對發生的事件調用適當的監聽者的notify()方法。在synchronize()方法中,我們把本地Mnemosyne注冊為所有遠程Mnemosyne的take和write事件的監聽者,一旦遠程Mnemosyne上有take和write事件發生,就會調用本地 Mnemosyne的notify()方法。然后,本地Mnemosyne必須對事件作出反應。下面是Mnemosyne如何與遠程Mnemosyne進行同步的例子:


public void notify(RemoteEvent remoteEvent) throws RemoteException
{
// 回寫被寫的內存,但無需通知所有的Mnemosyne
if(remoteEvent instanceof WriteRemoteEvent)
{
WriteRemoteEvent wre = (WriteRemoteEvent) remoteEvent;
commonContext.silentWrite(wre.getMemory());
}

file:// 取被寫的Memory,但無需通知所有的Mnemosyne
if(remoteEvent instanceof TakeRemoteEvent)
{
TakeRemoteEvent tre = (TakeRemoteEvent) remoteEvent;
commonContext.silentTake(tre.getMemory());
}
}


現在已經創建了一個控制所有memory對象的Mnemosyne,它自動與遠程Mnemosyne保持同步,如果任何一個遠程Mnemosynes得到或失去一個Memory對象時,都可以使它保持最新的狀態。

要通過Mnemosyne管理HTTP對話,servlet需要創建HttpSession的實例(從HttpServletRequest中使用getSession()),在實現Memory對象的類中封裝對話,并調用Mnemosyne對象的write()方法把封裝類寫到一個Mnemosyne中。

通過調用write()方法,封裝著對話的Memory對象沿著網絡傳送給Mnemosyne,并通知遠程機器。當對象被寫到Mnemosyne時,WriteRemoteEvent被發送給在Mnemosyne上注冊的所有WriteRemoteEventListeners,這樣,所有其他的Mnemosynes就能將新的對象作為Mnemosynes添加到它們的對話信息存貯庫中。

要對存貯的對話進行查詢,servlet調用read()方法查找包含對話的Memory對象,如果Mnemosyne找到了要查找的對象,則該對象通過RMI返回到servlet服務器。

最后,要刪除對話,servlet就會調用Mnemosyne的take()方法,Mnemosyne將象有read事件發生那樣退還Memory對象,同時從其存貯對象庫中刪除該Memory對象。同時,向其所有TakeRemoteEventListeners發送TakeRemoteEvent事件,通知所有的遠程Mnemosynes該Memory對象已經被刪除了。

建立對話服務器

上面我們已經討論了如何在多服務器上維護對話存貯庫,下面我們將討論如何建立對話服務器。在初始化過程中,對話服務器完成下列任務:

━━創建本地Mnemosyne對象。

━━把本地Mnemosyne綁定到RMI。

━━把本地Mnemosyne與其他的遠程Mnemosyne進行同步。

首先,對話服務器將獲得Mnemosyne對象的一個實例,該實例被綁定到對話服務器的本地IP上。


protected void bindMnemosyne()
{

file://得到Mnemosyne
Mnemosyne Mnemosyne = null;
try
{
Mnemosyne = MnemosyneFactory.getMnemosyne();
}
catch(RemoteException remoteException)
{
System.out.println("Internal error:");
System.out.println("Can't create a Mnemosyne");
System.exit(1);
}
// 把Mnemosyne綁定到MnemosyneImpl
try
{
String rmiURL = "//" + _localIP + "/MnemosyneImpl";
Naming.rebind(rmiURL, Mnemosyne);
}
catch(ArrayIndexOutOfBoundsException ArrayIndexOutOfBoundsException)
{
throw new IllegalArgumentException("LocalIP is invalid");
}
catch(MalformedURLException malformedURLException)
{
throw new IllegalArgumentException("LocalIP is invalid");
}
catch(RemoteException remoteException)
{
System.out.println("Internal error:");
System.out.println("Can't rebind a Mnemosyne to MnemosyneImpl");
System.exit(1);
}

}


通過把本地Mnemosyne上一系列代表RMI名字符號的URL賦予遠程對話服務器,就能引發同步操作,這些URL存貯在一個被稱作rmiURL的字符串數組中。在SessionServer的符號中,URL是作為參數從命令行命令中獲得的,但它可以來自其他渠道:


protected void synchronizeMnemosyne()
{
file://獲得本地Mnemosyne
Mnemosyne localMnemosyne = null;
try
{
localMnemosyne = (Mnemosyne) Naming.lookup(_localIP);
}
catch(Exception exception)
{
System.out.println("Internal error:");
System.out.println("Can't lookup local MnemosyneImpl");
System.exit(1);
}

file://獲得同步用的遠程Mnemosynes
Vector remoteMnemosynes = new Vector();

// _rmiURLS對象是代表需要進行同步的遠程服務器的字符串數組
for(int index = 1;index < _rmiURLS.length;index++)
{
try
{
remoteMnemosynes.add(Naming.lookup(_rmiURLS[index]));
}
catch(Exception exception)
{
}
}

file:// 同步
try
{
if(remoteMnemosynes.size() > 1)
localMnemosyne.synchronize(remoteMnemosynes);
}
catch(Exception exception)
{
System.out.println("Internal error:");
System.out.println("Can't synchronize local MnemosyneImpl");
System.exit(1);
}
}


遠程訪問Mnemosyne

下面我們來討論在servlet服務器上訪問遠程Mnemosyne的方法。要在無需特定服務器在線的情況下加載一個包含對話信息的Mnemosyne,需要創建一個FailoverHandler的實例,FailoverHandler利用JDK 1.3中的Proxy API處理對話服務器當機的問題。FailoverHandler把一個代表訪問遠程對話服務器的RMI URL的字符串數組作為參數,然后,從Proxy類中獲取Mnemosyne實例。下面的SessionManager類中的initializeMnemosyne()方法可以顯示出這一切是如何完成的:


public static void initializeMnemosyne(String[] rmiURLs)
{
// 設置當機服務器的處理程序
FailoverHandler fh = new FailoverHandler(null, rmiURLs);

// 得到Mnemosyne. 的一個實例
_Mnemosyne =
(Mnemosyne)Proxy.newProxyInstance(Mnemosyne.class.getClassLoader(),
new Class[] { Mnemosyne.class },
fh );
}


如果用Proxy類獲取Mnemosyne的實例,所有的方法調用必須通過FailoverHandler的 invoke()方法進行。當有方法訪問Mnemosyne時,FailoverHandler將試著調用該方法訪問一個遠程對象。如果方法調用失敗(例如服務器關機),FailoverHandler將從提供給構造器的URL清單中再取得下一個URL,這樣就會無縫地轉向下一個對話服務器。


// 建立遠程加載類的URL清單
public FailoverHandler(Remote delegate, String[] delegateURLS)
{
this.delegateURLS = delegateURLS;

// 如果這個URL無效,則獲取下一個有效的URL
try {
this.delegate =
((delegate == null)?getNextValidDelegate():delegate);
} catch (RemoteException ex) {
// 如果發生遠程意外錯誤,則該URL不能使用,向調用者發送一個 //IllegalArgumentException事件
throw new IllegalArgumentException("Remote URLs could not "
+ "be found");
}

}

public Object invoke(Object proxy,
Method method,
Object[] arguments)
throws Throwable
{
while(true)
{
try
{
file:// 嘗試對獲得的最后一個URL調用被調用的方法
return method.invoke(delegate, arguments);
}
catch(InvocationTargetException invocationTargetException)
{
file://如果獲得的URL無效,則取下一個URL
try
{
throw invocationTargetException.getTargetException();
}
catch(RemoteException remoteException)
{
delegate = getNextValidDelegate();
}
}
}
}

file://從構造器中的URL清單中獲得下一個URL
protected Remote getNextValidDelegate() throws RemoteException
{
for(int i = 0; i < delegateURLS.length;i++)
{
try
{
return Naming.lookup(delegateURLS[i]);
}
catch(Exception exception)
{
}
}

throw new RemoteException("All lookup failed");
}

當使用FailoverHandler對象時,從一個對話服務器向另一個對話服務器的轉換對于調用Mnemosyne的任何用戶端機器都是透明的。

盡管我們已經能夠訪問對話服務器,而且可以避免單一點故障,我們還必須為HttpSession建立一個封裝對象,而SessionWrapper就是這樣一個對象,而且,它還假定HttpSession的執行也是串行化的。如果它不是串行化的,可以很方便地修改封裝對象將對話的信息轉移到一個哈希表中并在其他成員變量中保留其他信息(ID、創作時間等信息。)。


public interface SessionWrapper extends Memory
{
/**
* 得到HttpSession的信息。
*/
public HttpSession getSession();
}

public class SessionWrapperImpl implements SessionWrapper
{

/**識別該對話的關鍵字 */
protected String _id;
/** 當前HttpSession的信息。 */
protected HttpSession _sess;

/**
* 建立ID,但沒有建立對話的其他信息,可用于通過read尋找一個對話。
*/
public SessionWrapper(String id) {
_id = id;
}

/**
* 建立一個帶對話的SessionWrapper。其ID與對話的ID相同。
*/
public SessionWrapper(HttpSession sess) {
_sess = sess;
_id = sess.getId();
}

/**
* 如果Memory對象是SessionWrapper的一個實例,當前的SessionWrapper
* 已經建立了與對象相同的ID,則此方法返回的值為真。
*/
public boolean equalsMemory(Memory m) {
return (m instanceof SessionWrapper
&& _id != null
&& _id.equals(((SessionWrapper)m)._id));
}

/**
* 得到HttpSession的信息。
*/
public HttpSession getSession() {
return _sess;
}
}


SessionWrapper類執行了Memory的界面,因此,HttpSession對象的ID可以與遠程對話的ID進行比較。

最后需要創建read()、write()和delete(),以對遠程對話進行管理。我們向SessionManager類添加三個靜態類:


/**
* 從在初始化時建立的Mnemosyne中得到HttpSession信息。
*/
public static HttpSession getSession(String id)
throws RemoteException
{
try {
SessionWrapper result
= (SessionWrapper)_Mnemosyne.read(new SessionWrapper(id),
null);
return result.getSession();
} catch (TransactionException ex) {
// 由于沒有處理事物,因此不會有事務意外被放棄。
ex.printStackTrace();
}
return null;
}

/**
* 在初始化時指定的Mnemosyne中保存對話信息。
*/
public static void saveSession(HttpSession sess)
throws RemoteException
{
try {
_Mnemosyne.write(new SessionWrapper(sess), null);
} catch (TransactionException ex) {
file://由于沒有處理事物,因此不會有事務意外被放棄。
ex.printStackTrace();
}
}

/**
* 從在初始化時指定的Mnemosyne中刪除一個對話。
*/
public static void removeSession(String id)
throws RemoteException
{
try {
_Mnemosyne.take(new SessionWrapper(id), null);
} catch (TransactionException ex) {
// /由于沒有處理事物,因此不會有事務意外被放棄
ex.printStackTrace();
}
}

在servlet中,可以以如下方式管理對話

public void init(ServletConfig conf) throws ServletException {
// 調用一個方法得到指示對話服務器位置的RMI URL清單
// 例如://server1.foo.com/MnemosyneImpl, //server2.foo.com/MnemosyneImpl,等
String[] urls = getURLs(conf); // Method to get the URLs from properties for the session servers
SessionManager.initializeMnemosyne(urls)
}

public void doGet(HttpServletRequest req, HttpServletResponse rsp) throws IOException {

file:// 得到存貯在cookie中的對話,僅僅是為了得到其ID。
HttpSession baseSess = req.getSession()
file://根據得到的ID,從Mnemosyne中得到真正的對話
HttpSession realSess = SessionManager.getSession(base.getId());

SessionManager.saveSession(realSess);
}


結論

盡管這篇文章討論了一個分布式對話管理的例子,但我們可以將這一技術用于管理必須容忍任一節點出錯的分布式內存管理系統中。Mnemosyne還可以用在成員不斷加入和離開的P2P應用中。通過使用Mnemosyne,任何一個成員都可以與系統進行快速同步,而無需要求為保持系統有效而必須保證某一結點不出故障。

作者:http://www.zhujiangroad.com
來源:http://www.zhujiangroad.com
北斗有巢氏 有巢氏北斗