`
whitesock
  • 浏览: 479115 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

Multicast Discovery

    博客分类:
  • SE
阅读更多

   这几天抽空看了看英文版的JMX in Action,本来对于JMX的理解限于一些点,读过之后感觉这些点终于连成网了。首先评论一下这本书的作者Benjamin G. Sullins和Mark B. Whipple。跟有些作者不同,这两位仁兄没有见龙卸甲,舍我其谁的风范,而是充分考虑的读者的智商和承受能力,把书写的比较浅显易懂(我觉得好的程序员在编写程序的时候,也应该具备这样的素质),有的地方甚至很唠叨。由于这本书出版的比较早(2003年),所以里面有些内容也过时了,例如书中对于RMIConnectorServer的介绍还限于SUN的参考实现,目前这部分内容已经成为标准,由JMXConnectorServer统一管理了。另外在这里抱怨一下Model MBean,虽然设计这个MBean的初衷可能是为了避免JMX的侵入性等,但是创建Model MBean的代码实在是有点丑陋。在JMX in Action中,作者介绍了使用JINI来发现MBean Agent的例子,虽然这是JINI的强项,但是个人感觉现在用Multicast作为发现机制的也不少。一时兴起,就写了个Multicast Discovery的例子。首先声明,以下代码没有经过严格测试,设计上可能也有局限。

   首先定义一下各种接口:

import java.util.List;

public interface DiscoveryAgent {
	
	List<DiscoveryService> getDiscoveryServices ();
	
	void registerService(DiscoveryService discoveryService);
	
	void addDiscoveryListener(DiscoveryListener listener);
	
	void removeDiscoveryListener(DiscoveryListener listener);
}
public interface DiscoveryService {
	
	Object getServiceId();
	
	byte[] toByteArray();
}
public interface DiscoveryServiceFactory {
	
	DiscoveryService valueOf(byte data[], int offset, int length);
	
} 
public interface DiscoveryListener {
	
	void onServiceAdd(DiscoveryAgent agent, DiscoveryService service);
	
	void onServiceRemove(DiscoveryAgent agent, DiscoveryService service);
	
}
public class DiscoveryURI {
	//
	private String host;
	private int port;
	
	public DiscoveryURI() {
	}
	
	public DiscoveryURI(String host, int port) {
		this.host = host;
		this.port = port;
	}
	
	public String getHost() {
		return host;
	}
	
	public void setHost(String host) {
		this.host = host;
	}
	
	public int getPort() {
		return port;
	}
	
	public void setPort(int port) {
		this.port = port;
	}
}

   其中DiscoveryAgent中的registerService方法用于注册一个DiscoveryService,这个DiscoveryService会通过multicast(或者其它方式)对网络上的其它DiscoveryAgent公开。当一个DiscoveryService被公开时,首先通过其toByteArray方法得到字节数组,然后将这个字节数组设置到DatagramPacket上,最终通过multicast发送到网络上。getDiscoveryServices方法用于返回目前DiscoveryAgent已经发现的所有DiscoveryService。当一个DiscoveryAgent收到一个DatagramPacket后,首先通过DiscoveryServiceFactory的valueOf方法创建一个DiscoveryService,然后再根据DiscoveryService上的serviceId对这个DiscoveryService进行进一步的处理。   

   接下来是DiscoveryAgent的实现类:MulticastDiscoveryAgent

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;

public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
	//
	private static Logger logger = Logger.getLogger(MulticastDiscoveryAgent.class);
	
	//
	private static final String DEFAULT_MULTICAST_HOST = "231.0.0.1";
	private static final int DEFAULT_MULTICAST_PORT = 9697;
	private static final int DEFAULT_RECEIVE_BUFFER_SIZE = 8192;
	private static final int DEFAULT_HEART_BEAT_INTERVAL = 2000;
	private static final int DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH = 5;
	
	//
	private Thread worker;
	private long lastHeartBeatTime;
	private AtomicBoolean workerStarted;
	private volatile int heartBeatInterval;
	private volatile int heartBeatMissBeforeDeath;
	private volatile int receiveBufferSize;
	private MulticastSocket multicastSocket;
	
	//
	private DiscoveryURI discoveryURI;
	private DiscoveryService registeredService;
	private DiscoveryServiceFactory discoveryServiceFactory;
	private List<DiscoveryListener> discoveryListeners;
	private List<DiscoveryServiceWrapper> discoveryServices;
	
	
	/**
	 * 
	 */
	public MulticastDiscoveryAgent() {
		this.workerStarted = new AtomicBoolean(false);
		this.receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
		this.heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
		this.heartBeatMissBeforeDeath = DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH;
		this.discoveryURI = new DiscoveryURI(DEFAULT_MULTICAST_HOST, DEFAULT_MULTICAST_PORT);
		this.discoveryListeners = Collections.synchronizedList(new ArrayList<DiscoveryListener>());
		this.discoveryServices = Collections.synchronizedList(new ArrayList<DiscoveryServiceWrapper>());
	}
	
	/**
	 * 
	 */
	public boolean isStarted() {
		return workerStarted.get();
	}
	
	public boolean start() throws IOException {
		if (workerStarted.compareAndSet(false, true)) {
			//
			InetAddress group = InetAddress.getByName(discoveryURI.getHost());
			multicastSocket = new MulticastSocket(discoveryURI.getPort());
			multicastSocket.joinGroup(group);
			multicastSocket.setSoTimeout(heartBeatInterval);
			multicastSocket.setLoopbackMode(false);
			multicastSocket.setTimeToLive(1);
	        
			//
			worker = new Thread(this);
			worker.setDaemon(true);
			worker.start();
			
			//
			if(logger.isInfoEnabled()) {
				logger.info("multicast discovery agent started");
			}
			
			//
			return true;
		} else {
			return false;
		}
	}
	
	public boolean stop() {
		if (workerStarted.compareAndSet(true, false)) {
			// TODO
			
			//
			if(logger.isInfoEnabled()) {
				logger.info("multicast discovery agent stopped");
			}
			
			//
			return true;
		} else {
			return false;
		}
	}
	
	public void run() {
		try {
			while(workerStarted.get()) {
				send();
				receive();
			}
		} catch(Exception e) {
			logger.error("failed to run agent, detail: " + e.toString());
		} finally {
			try {
				//
				discoveryServices.clear();
				
				//
				InetAddress group = InetAddress.getByName(discoveryURI.getHost());
				multicastSocket.leaveGroup(group);
				multicastSocket.close();
				multicastSocket = null;
			} catch(Exception e) {
				logger.error("failed to close socket, detail: " + e.toString());
			}
		}
	}
	
	private void send() throws IOException {
		// If got nothing to advertise, just return
		if(this.registeredService == null) {
			return;
		}
		
		//
		long now = System.currentTimeMillis();
		if(this.heartBeatInterval + this.lastHeartBeatTime <= now) {
			this.lastHeartBeatTime = now;
			byte data[] = this.registeredService.toByteArray();
			DatagramPacket packet = new DatagramPacket (data, data.length, InetAddress.getByName (discoveryURI.getHost()), discoveryURI.getPort());
			multicastSocket.send(packet);
		}
	}
	
	private void receive() throws IOException {
		try {
			byte[] buffer = new byte[receiveBufferSize];
	        DatagramPacket packet = new DatagramPacket(buffer, 0, buffer.length);
	        multicastSocket.receive(packet);
	        if (packet.getLength() > 0) {
	        	// Parse the packet
	        	long now = System.currentTimeMillis();
	        	DiscoveryService ds = discoveryServiceFactory.valueOf(packet.getData(), packet.getOffset(), packet.getLength());
	        	
	        	//
	        	if(this.registeredService != null && this.registeredService.getServiceId().equals(ds.getServiceId())) {
	        		// Ignore
	        	} else {
	        		// Clean up the discovery services
		        	boolean gotIt = false;
		        	for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) {
		        		DiscoveryServiceWrapper dsw = iter.next();
		        		
		        		//
						if(dsw.getDiscoverService().getServiceId().equals(ds.getServiceId())) {
							dsw.setLastHeartBeatTime(now);
							gotIt = true;
						} 
						
						// Clean up
						if( dsw.getLastHeartBeatTime() + (getHeartBeatInterval() * getHeartBeatMissBeforeDeath()) < now) {
							//
							iter.remove();
							
							//
							if(logger.isInfoEnabled()) {
								logger.info("discovery service: " + ds.getServiceId() + " is removed from the agent");
							}
							
							// Notify listeners
			        		for(Iterator<DiscoveryListener> iter2 = discoveryListeners.iterator(); iter2.hasNext(); ) {
			        			iter2.next().onServiceRemove(this, dsw.getDiscoverService());
			        		}
						}
					}
		        	
		        	// Add to discovered services
		        	if(!gotIt) {
		        		//
		        		DiscoveryServiceWrapper wrapper = new DiscoveryServiceWrapper();
		        		wrapper.setLastHeartBeatTime(now);
		        		wrapper.setDiscoverService(ds);
		        		discoveryServices.add(wrapper);
		        		
		        		//
		        		if(logger.isInfoEnabled()) {
							logger.info("discovery service: " + ds.getServiceId() + " is added to the agent");
						}
		        		
		        		// Notify listeners
		        		for(Iterator<DiscoveryListener> iter = discoveryListeners.iterator(); iter.hasNext(); ) {
		        			iter.next().onServiceAdd(this, ds);
		        		}
		        	}
	        	}
	        } else {
	        	logger.warn("ignored a invalid packet");
	        }
		} catch(SocketTimeoutException ste) {
			// Ignore
		}
	}

	/**
	 * 
	 */
	public void registerService(DiscoveryService discoveryService) {
		this.registeredService = discoveryService;
	}
	
	public List<DiscoveryService> getDiscoveryServices() {
		List<DiscoveryService> r = new ArrayList<DiscoveryService>();
		for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) {
			r.add(iter.next().getDiscoverService());
		}
		return r;
	}
	
	public void addDiscoveryListener(DiscoveryListener listener) {
		if(listener != null && !discoveryListeners.contains(listener)) {
			discoveryListeners.add(listener);
		}
	}

	public void removeDiscoveryListener(DiscoveryListener listener) {
		if(listener != null) {
			discoveryListeners.remove(listener);
		}
	}
	
	/**
	 * 
	 */
	public DiscoveryURI getDiscoveryURI() {
		return discoveryURI;
	}

	public void setDiscoveryURI(DiscoveryURI discoveryURI) {
		this.discoveryURI = discoveryURI;
	}
	
	public DiscoveryServiceFactory getDiscoveryServiceFactory() {
		return discoveryServiceFactory;
	}

	public void setDiscoveryServiceFactory(DiscoveryServiceFactory discoveryServiceFactory) {
		this.discoveryServiceFactory = discoveryServiceFactory;
	}
	
	public int getReceiveBufferSize() {
		return receiveBufferSize;
	}

	public void setReceiveBufferSize(int receiveBufferSize) {
		this.receiveBufferSize = receiveBufferSize;
	}
	
	public int getHeartBeatInterval() {
		return heartBeatInterval;
	}

	public void setHeartBeatInterval(int heartBeatInterval) {
		this.heartBeatInterval = heartBeatInterval;
	}
	
	public int getHeartBeatMissBeforeDeath() {
		return heartBeatMissBeforeDeath;
	}

	public void setHeartBeatMissBeforeDeath(int heartBeatMissBeforeDeath) {
		this.heartBeatMissBeforeDeath = heartBeatMissBeforeDeath;
	}
	
	/**
	 * 
	 */
	private static class DiscoveryServiceWrapper {
		private long lastHeartBeatTime;
		private DiscoveryService discoverService;
		
		public DiscoveryServiceWrapper() {
			
		}

		public long getLastHeartBeatTime() {
			return lastHeartBeatTime;
		}

		public void setLastHeartBeatTime(long lastHeartBeatTime) {
			this.lastHeartBeatTime = lastHeartBeatTime;
		}

		public DiscoveryService getDiscoverService() {
			return discoverService;
		}

		public void setDiscoverService(DiscoveryService discoverService) {
			this.discoverService = discoverService;
		}
	}
}

   最后是用于测试的代码了, 首先定制一下SimpleDiscoveryService和SimpleDiscoveryServiceFactory,然后执行MulticastDiscoveryAgentTest,从控制台上就可以看到各个agent已经发现的service。执行一段时间后,main函数里会停止mda2,再等一段时间(大概是10秒左右),mda2的公开service就会从mda1、mda3和mda4中移除。mda4上没有注册DiscoveryService,因此mda4也就不会对外DiscoveryService,它的目的就是发现其它agent公开的DiscoveryServcie。

public class SimpleDiscoveryService implements DiscoveryService {
	//
	private String serviceId;
	
	public SimpleDiscoveryService(String serviceId) {
		this.serviceId = serviceId;
	}
	
	public Object getServiceId() {
		return serviceId;
	}

	public byte[] toByteArray() {
		return serviceId.getBytes();
	}
}
public class SimpleDiscoveryServiceFactory implements DiscoveryServiceFactory {

	public DiscoveryService valueOf(byte[] data, int offset, int length) {
		String serviceId = new String(data, offset, length);
		return new SimpleDiscoveryService(serviceId);
	}
}
import java.util.Iterator;
import java.util.List;

public class MulticastDiscoveryAgentTest {
	public static void main(String args[]) {
		try {
			//
			SimpleDiscoveryService sds1 = new SimpleDiscoveryService("service1@localhost");
			MulticastDiscoveryAgent mda1 = new MulticastDiscoveryAgent();
			mda1.registerService(sds1);
			mda1.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda1.start();
			
			SimpleDiscoveryService sds2 = new SimpleDiscoveryService("service2@remotehost");
			MulticastDiscoveryAgent mda2 = new MulticastDiscoveryAgent();
			mda2.registerService(sds2);
			mda2.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda2.start();
			
			SimpleDiscoveryService sds3 = new SimpleDiscoveryService("service3@anotherhost");
			MulticastDiscoveryAgent mda3 = new MulticastDiscoveryAgent();
			mda3.registerService(sds3);
			mda3.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda3.start();
			
			MulticastDiscoveryAgent mda4 = new MulticastDiscoveryAgent();
			mda4.registerService(null);
			mda4.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory());
			mda4.addDiscoveryListener(new DiscoveryListener() {

				public void onServiceAdd(DiscoveryAgent agent, DiscoveryService service) {
					System.out.println("#on service add: " + service.getServiceId());
				}

				public void onServiceRemove(DiscoveryAgent agent, DiscoveryService service) {
					System.out.println("#on service remove: " + service.getServiceId());
				}
			});
			mda4.start();
			
			//
			int round = 0;
			while(true) {
				Thread.sleep(3000);
				
				//
				System.out.println("\n#round: " + round);
				System.out.println("services@agent1: " + getServiceIds(mda1));
				System.out.println("services@agent2: " + getServiceIds(mda2));
				System.out.println("services@agent3: " + getServiceIds(mda3));
				System.out.println("services@agent4: " + getServiceIds(mda4));
				
				//
				round++;
				if(round > 3 && mda2 != null) {
					mda2.stop();
					mda2 = null;
					System.out.println("\n#agent2 stopped, time: " + System.currentTimeMillis());
				}
			}
		} catch(Exception e) {
			e.printStackTrace();
		}
	}
	
	private static String getServiceIds(MulticastDiscoveryAgent mda) {
		if(mda == null) {
			return "";
		}
		
		StringBuffer r = new StringBuffer();
		List<DiscoveryService> discoveryServices = mda.getDiscoveryServices();
		for(Iterator<DiscoveryService>iter = discoveryServices.iterator(); iter.hasNext(); ) {
			DiscoveryService ds = iter.next();
			r.append(ds.getServiceId());
			if(iter.hasNext()) {
				r.append(",");
			}
		}
		return r.toString();
	}
}

   通过这个例子,可以在JMX中方便地将connctor server的属性对外公开,以于查找发现。另外,使用DiscoveryService的模块可能会在DiscoveryAgent之前发现这个DiscoveryService已经失效,所以可以考虑在DiscoveryAgent接口上再增加一个方法,用于DiscoveryService的客户模块强制的将DiscoveryService从DiscoveryAgent中移除。

4
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics