- 浏览: 479115 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
龘龘龘:
TrueBrian 写道有个问题,Sample 1中,为了控制 ...
What's New on Java 7 Phaser -
龘龘龘:
楼主总结的不错。
What's New on Java 7 Phaser -
TrueBrian:
有个问题,Sample 1中,为了控制线程的启动时机,博主实际 ...
What's New on Java 7 Phaser -
liguanqun811:
不知道楼主是否对zookeeper实现的分布式锁进行过性能测试 ...
Distributed Lock -
hobitton:
mysql的get lock有版本限制,否则get lock可 ...
Distributed Lock
这几天抽空看了看英文版的JMX in Action,本来对于JMX的理解限于一些点,读过之后感觉这些点终于连成网了。首先评论一下这本书的作者Benjamin G. Sullins和Mark B. Whipple。跟有些作者不同,这两位仁兄没有见龙卸甲,舍我其谁的风范,而是充分考虑的读者的智商和承受能力,把书写的比较浅显易懂(我觉得好的程序员在编写程序的时候,也应该具备这样的素质),有的地方甚至很唠叨。由于这本书出版的比较早(2003年),所以里面有些内容也过时了,例如书中对于RMIConnectorServer的介绍还限于SUN的参考实现,目前这部分内容已经成为标准,由JMXConnectorServer统一管理了。另外在这里抱怨一下Model MBean,虽然设计这个MBean的初衷可能是为了避免JMX的侵入性等,但是创建Model MBean的代码实在是有点丑陋。在JMX in Action中,作者介绍了使用JINI来发现MBean Agent的例子,虽然这是JINI的强项,但是个人感觉现在用Multicast作为发现机制的也不少。一时兴起,就写了个Multicast Discovery的例子。首先声明,以下代码没有经过严格测试,设计上可能也有局限。
首先定义一下各种接口:
import java.util.List; public interface DiscoveryAgent { List<DiscoveryService> getDiscoveryServices (); void registerService(DiscoveryService discoveryService); void addDiscoveryListener(DiscoveryListener listener); void removeDiscoveryListener(DiscoveryListener listener); }
public interface DiscoveryService { Object getServiceId(); byte[] toByteArray(); }
public interface DiscoveryServiceFactory { DiscoveryService valueOf(byte data[], int offset, int length); }
public interface DiscoveryListener { void onServiceAdd(DiscoveryAgent agent, DiscoveryService service); void onServiceRemove(DiscoveryAgent agent, DiscoveryService service); }
public class DiscoveryURI { // private String host; private int port; public DiscoveryURI() { } public DiscoveryURI(String host, int port) { this.host = host; this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
其中DiscoveryAgent中的registerService方法用于注册一个DiscoveryService,这个DiscoveryService会通过multicast(或者其它方式)对网络上的其它DiscoveryAgent公开。当一个DiscoveryService被公开时,首先通过其toByteArray方法得到字节数组,然后将这个字节数组设置到DatagramPacket上,最终通过multicast发送到网络上。getDiscoveryServices方法用于返回目前DiscoveryAgent已经发现的所有DiscoveryService。当一个DiscoveryAgent收到一个DatagramPacket后,首先通过DiscoveryServiceFactory的valueOf方法创建一个DiscoveryService,然后再根据DiscoveryService上的serviceId对这个DiscoveryService进行进一步的处理。
接下来是DiscoveryAgent的实现类:MulticastDiscoveryAgent
import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { // private static Logger logger = Logger.getLogger(MulticastDiscoveryAgent.class); // private static final String DEFAULT_MULTICAST_HOST = "231.0.0.1"; private static final int DEFAULT_MULTICAST_PORT = 9697; private static final int DEFAULT_RECEIVE_BUFFER_SIZE = 8192; private static final int DEFAULT_HEART_BEAT_INTERVAL = 2000; private static final int DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH = 5; // private Thread worker; private long lastHeartBeatTime; private AtomicBoolean workerStarted; private volatile int heartBeatInterval; private volatile int heartBeatMissBeforeDeath; private volatile int receiveBufferSize; private MulticastSocket multicastSocket; // private DiscoveryURI discoveryURI; private DiscoveryService registeredService; private DiscoveryServiceFactory discoveryServiceFactory; private List<DiscoveryListener> discoveryListeners; private List<DiscoveryServiceWrapper> discoveryServices; /** * */ public MulticastDiscoveryAgent() { this.workerStarted = new AtomicBoolean(false); this.receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; this.heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL; this.heartBeatMissBeforeDeath = DEFAULT_HEART_BEAT_MISS_BEFORE_DEATH; this.discoveryURI = new DiscoveryURI(DEFAULT_MULTICAST_HOST, DEFAULT_MULTICAST_PORT); this.discoveryListeners = Collections.synchronizedList(new ArrayList<DiscoveryListener>()); this.discoveryServices = Collections.synchronizedList(new ArrayList<DiscoveryServiceWrapper>()); } /** * */ public boolean isStarted() { return workerStarted.get(); } public boolean start() throws IOException { if (workerStarted.compareAndSet(false, true)) { // InetAddress group = InetAddress.getByName(discoveryURI.getHost()); multicastSocket = new MulticastSocket(discoveryURI.getPort()); multicastSocket.joinGroup(group); multicastSocket.setSoTimeout(heartBeatInterval); multicastSocket.setLoopbackMode(false); multicastSocket.setTimeToLive(1); // worker = new Thread(this); worker.setDaemon(true); worker.start(); // if(logger.isInfoEnabled()) { logger.info("multicast discovery agent started"); } // return true; } else { return false; } } public boolean stop() { if (workerStarted.compareAndSet(true, false)) { // TODO // if(logger.isInfoEnabled()) { logger.info("multicast discovery agent stopped"); } // return true; } else { return false; } } public void run() { try { while(workerStarted.get()) { send(); receive(); } } catch(Exception e) { logger.error("failed to run agent, detail: " + e.toString()); } finally { try { // discoveryServices.clear(); // InetAddress group = InetAddress.getByName(discoveryURI.getHost()); multicastSocket.leaveGroup(group); multicastSocket.close(); multicastSocket = null; } catch(Exception e) { logger.error("failed to close socket, detail: " + e.toString()); } } } private void send() throws IOException { // If got nothing to advertise, just return if(this.registeredService == null) { return; } // long now = System.currentTimeMillis(); if(this.heartBeatInterval + this.lastHeartBeatTime <= now) { this.lastHeartBeatTime = now; byte data[] = this.registeredService.toByteArray(); DatagramPacket packet = new DatagramPacket (data, data.length, InetAddress.getByName (discoveryURI.getHost()), discoveryURI.getPort()); multicastSocket.send(packet); } } private void receive() throws IOException { try { byte[] buffer = new byte[receiveBufferSize]; DatagramPacket packet = new DatagramPacket(buffer, 0, buffer.length); multicastSocket.receive(packet); if (packet.getLength() > 0) { // Parse the packet long now = System.currentTimeMillis(); DiscoveryService ds = discoveryServiceFactory.valueOf(packet.getData(), packet.getOffset(), packet.getLength()); // if(this.registeredService != null && this.registeredService.getServiceId().equals(ds.getServiceId())) { // Ignore } else { // Clean up the discovery services boolean gotIt = false; for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) { DiscoveryServiceWrapper dsw = iter.next(); // if(dsw.getDiscoverService().getServiceId().equals(ds.getServiceId())) { dsw.setLastHeartBeatTime(now); gotIt = true; } // Clean up if( dsw.getLastHeartBeatTime() + (getHeartBeatInterval() * getHeartBeatMissBeforeDeath()) < now) { // iter.remove(); // if(logger.isInfoEnabled()) { logger.info("discovery service: " + ds.getServiceId() + " is removed from the agent"); } // Notify listeners for(Iterator<DiscoveryListener> iter2 = discoveryListeners.iterator(); iter2.hasNext(); ) { iter2.next().onServiceRemove(this, dsw.getDiscoverService()); } } } // Add to discovered services if(!gotIt) { // DiscoveryServiceWrapper wrapper = new DiscoveryServiceWrapper(); wrapper.setLastHeartBeatTime(now); wrapper.setDiscoverService(ds); discoveryServices.add(wrapper); // if(logger.isInfoEnabled()) { logger.info("discovery service: " + ds.getServiceId() + " is added to the agent"); } // Notify listeners for(Iterator<DiscoveryListener> iter = discoveryListeners.iterator(); iter.hasNext(); ) { iter.next().onServiceAdd(this, ds); } } } } else { logger.warn("ignored a invalid packet"); } } catch(SocketTimeoutException ste) { // Ignore } } /** * */ public void registerService(DiscoveryService discoveryService) { this.registeredService = discoveryService; } public List<DiscoveryService> getDiscoveryServices() { List<DiscoveryService> r = new ArrayList<DiscoveryService>(); for(Iterator<DiscoveryServiceWrapper> iter = discoveryServices.iterator(); iter.hasNext(); ) { r.add(iter.next().getDiscoverService()); } return r; } public void addDiscoveryListener(DiscoveryListener listener) { if(listener != null && !discoveryListeners.contains(listener)) { discoveryListeners.add(listener); } } public void removeDiscoveryListener(DiscoveryListener listener) { if(listener != null) { discoveryListeners.remove(listener); } } /** * */ public DiscoveryURI getDiscoveryURI() { return discoveryURI; } public void setDiscoveryURI(DiscoveryURI discoveryURI) { this.discoveryURI = discoveryURI; } public DiscoveryServiceFactory getDiscoveryServiceFactory() { return discoveryServiceFactory; } public void setDiscoveryServiceFactory(DiscoveryServiceFactory discoveryServiceFactory) { this.discoveryServiceFactory = discoveryServiceFactory; } public int getReceiveBufferSize() { return receiveBufferSize; } public void setReceiveBufferSize(int receiveBufferSize) { this.receiveBufferSize = receiveBufferSize; } public int getHeartBeatInterval() { return heartBeatInterval; } public void setHeartBeatInterval(int heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; } public int getHeartBeatMissBeforeDeath() { return heartBeatMissBeforeDeath; } public void setHeartBeatMissBeforeDeath(int heartBeatMissBeforeDeath) { this.heartBeatMissBeforeDeath = heartBeatMissBeforeDeath; } /** * */ private static class DiscoveryServiceWrapper { private long lastHeartBeatTime; private DiscoveryService discoverService; public DiscoveryServiceWrapper() { } public long getLastHeartBeatTime() { return lastHeartBeatTime; } public void setLastHeartBeatTime(long lastHeartBeatTime) { this.lastHeartBeatTime = lastHeartBeatTime; } public DiscoveryService getDiscoverService() { return discoverService; } public void setDiscoverService(DiscoveryService discoverService) { this.discoverService = discoverService; } } }
最后是用于测试的代码了, 首先定制一下SimpleDiscoveryService和SimpleDiscoveryServiceFactory,然后执行MulticastDiscoveryAgentTest,从控制台上就可以看到各个agent已经发现的service。执行一段时间后,main函数里会停止mda2,再等一段时间(大概是10秒左右),mda2的公开service就会从mda1、mda3和mda4中移除。mda4上没有注册DiscoveryService,因此mda4也就不会对外DiscoveryService,它的目的就是发现其它agent公开的DiscoveryServcie。
public class SimpleDiscoveryService implements DiscoveryService { // private String serviceId; public SimpleDiscoveryService(String serviceId) { this.serviceId = serviceId; } public Object getServiceId() { return serviceId; } public byte[] toByteArray() { return serviceId.getBytes(); } }
public class SimpleDiscoveryServiceFactory implements DiscoveryServiceFactory { public DiscoveryService valueOf(byte[] data, int offset, int length) { String serviceId = new String(data, offset, length); return new SimpleDiscoveryService(serviceId); } }
import java.util.Iterator; import java.util.List; public class MulticastDiscoveryAgentTest { public static void main(String args[]) { try { // SimpleDiscoveryService sds1 = new SimpleDiscoveryService("service1@localhost"); MulticastDiscoveryAgent mda1 = new MulticastDiscoveryAgent(); mda1.registerService(sds1); mda1.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda1.start(); SimpleDiscoveryService sds2 = new SimpleDiscoveryService("service2@remotehost"); MulticastDiscoveryAgent mda2 = new MulticastDiscoveryAgent(); mda2.registerService(sds2); mda2.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda2.start(); SimpleDiscoveryService sds3 = new SimpleDiscoveryService("service3@anotherhost"); MulticastDiscoveryAgent mda3 = new MulticastDiscoveryAgent(); mda3.registerService(sds3); mda3.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda3.start(); MulticastDiscoveryAgent mda4 = new MulticastDiscoveryAgent(); mda4.registerService(null); mda4.setDiscoveryServiceFactory(new SimpleDiscoveryServiceFactory()); mda4.addDiscoveryListener(new DiscoveryListener() { public void onServiceAdd(DiscoveryAgent agent, DiscoveryService service) { System.out.println("#on service add: " + service.getServiceId()); } public void onServiceRemove(DiscoveryAgent agent, DiscoveryService service) { System.out.println("#on service remove: " + service.getServiceId()); } }); mda4.start(); // int round = 0; while(true) { Thread.sleep(3000); // System.out.println("\n#round: " + round); System.out.println("services@agent1: " + getServiceIds(mda1)); System.out.println("services@agent2: " + getServiceIds(mda2)); System.out.println("services@agent3: " + getServiceIds(mda3)); System.out.println("services@agent4: " + getServiceIds(mda4)); // round++; if(round > 3 && mda2 != null) { mda2.stop(); mda2 = null; System.out.println("\n#agent2 stopped, time: " + System.currentTimeMillis()); } } } catch(Exception e) { e.printStackTrace(); } } private static String getServiceIds(MulticastDiscoveryAgent mda) { if(mda == null) { return ""; } StringBuffer r = new StringBuffer(); List<DiscoveryService> discoveryServices = mda.getDiscoveryServices(); for(Iterator<DiscoveryService>iter = discoveryServices.iterator(); iter.hasNext(); ) { DiscoveryService ds = iter.next(); r.append(ds.getServiceId()); if(iter.hasNext()) { r.append(","); } } return r.toString(); } }
通过这个例子,可以在JMX中方便地将connctor server的属性对外公开,以于查找发现。另外,使用DiscoveryService的模块可能会在DiscoveryAgent之前发现这个DiscoveryService已经失效,所以可以考虑在DiscoveryAgent接口上再增加一个方法,用于DiscoveryService的客户模块强制的将DiscoveryService从DiscoveryAgent中移除。
发表评论
-
Understanding the Hash Array Mapped Trie
2012-03-30 10:36 0mark -
A Hierarchical CLH Queue Lock
2012-01-14 19:01 2114A Hierarchical CLH Queue Lock ( ... -
Inside AbstractQueuedSynchronizer (4)
2012-01-08 17:06 3473Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (3)
2012-01-07 23:37 4607Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (2)
2012-01-07 17:54 6311Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (1)
2012-01-06 11:04 7891Inside AbstractQueuedSynchroniz ... -
Code Optimization
2011-10-14 00:11 1564当前开发人员在进行编码的时候,可能很少关注纯粹代码级别的优化了 ... -
Distributed Lock
2011-08-02 22:02 91371 Overview 在分布式系统中,通常会 ... -
What's New on Java 7 Phaser
2011-07-29 10:15 81471 Overview Java 7的并 ... -
Sequantial Lock in Java
2011-06-07 17:00 21741 Overview Linux内核中常见的同步机 ... -
Feature or issue?
2011-04-26 22:23 121以下代码中,为何CglibTest.intercept ... -
Bloom Filter
2010-10-19 00:41 50251 Overview Bloom filt ... -
Inside java.lang.Enum
2010-08-04 15:40 64091 Introduction to enum J ... -
Open Addressing
2010-07-07 17:59 34051 Overview Open addressi ... -
JLine
2010-06-17 09:11 10954Overview JLine 是一个用来处理控 ... -
ID Generator
2010-06-14 14:45 1639关于ID Generator,想 ... -
inotify-java
2009-07-22 22:58 82131 Overview 最近公 ... -
Perf4J
2009-06-11 23:13 84341 Overview Perf4j是一个用于计算 ... -
Progress Estimator
2009-02-22 19:37 1489Jakarta Commons Cookbook这本书 ... -
jManage
2008-12-22 00:40 39161 Overview 由于项目需要, 笔者开发了一个 ...
相关推荐
The Service Interface for Requesting IP Multicast Reception . 9 4. Multicast Listening State Maintained by Nodes . . . . . . . . 11 5. Message Formats . . . . . . . . . . . . . . . . . . . . . . . ...
The Simple Service Discovery Protocol (SSDP) provides a mechanism where by network clients, with little or... for multicast discovery support as well as server based notification and discovery routing.
这是小弟制作的MLDv2的pps檔,有兴趣的人可以叁考看看.
A novel and efficient source-path discovery and maintenance method for application layer multicast
Multicast Listener Discovery version 2 headers.
* MLD (Multicast listener discovery for IPv6). Aims to be compliant with RFC 2710. No support for MLDv2 * ND (Neighbor discovery and stateless address autoconfiguration for IPv6). Aims to be ...
组播源发现协议MSDP(Multicast Source Discovery Protocol)是基于多个PIM-SM (Protocol Independent Multicast-Sparse Mode)域互连而开发的一种域间组播解决方 案,目前只支持IPv4。
组播源发现协议MSDP(Multicast Source Discovery Protocol)是基于多个PIM-SM (Protocol Independent Multicast-Sparse Mode)域互连而开发的一种域间组播解决方 案,目前只支持IPv4。
MLD(Multicast Listener Discovery)组播侦听者发现协议,是负责IPv6组播成员管理的 协议,用来在IPv6主机和与其直连的组播设备之间建立、维护组播组成员关系。
MLD(Multicast Listener Discovery)组播侦听者发现协议,是负责IPv6组播成员管理的 协议,用来在IPv6主机和与其直连的组播设备之间建立、维护组播组成员关系。
LLC/SNAP encapsulation, VC multiplexing, AAL 5, Neighbor Discovery, Addresses Auto-configuration, ICMP Redirect, MARS (Multicast Address Resolution Server), NHRP (Next Hop Resolution Protocol), ...
7 Multicast Listener Discovery and MLD Version 2. . . . . . 171 8 Address Autoconfiguration . . . . . . . . . 191 9 IPv6 and Name Resolution . . . . . . . . . 209 10 IPv6 Routing . . . . . . . . . . ....
* MLD (Multicast listener discovery for IPv6). Aims to be compliant with RFC 2710. No support for MLDv2 * ND (Neighbor discovery and stateless address autoconfiguration for IPv6). Aims to be ...
Elasticsearch默认使用: elasticsearch作为集群名称多播发现 # cluster.name: elasticsearchcluster.name: handson# discovery.zen.ping.multicast.enabled: falsediscovery.zen.ping.multicast.enabl
通过对MLDv2(multicast listener discovery)和PIM-SSM(protocol independent multicast-source specific multicast)协议进行扩展,实现业务自适应源特定组播;通过扩展资源预约协议(resource reservation p
Originally reactive protocols were not design for the characteristic of highly mobility during route discovery. Due to dynamically modification to the VANET this changes very often due to breakdown ...
RO14 - Introducing the DiscoveryClient... RO15 - Behind the Scenes: Inside... RO16 - Class Factories Overview RO17 - Manipulation of RODL Meta Data RO18 - RODL - The Service Description... RO19 - ...
Efficient Ethernet Link Layer Discovery Protocol Supports Loop Detection Security Filtering Disable learning for each port Disable learning-table aging for each port Drop unknown...
scalecube服务 微服务2.0 一个开放源代码项目,致力于简化由开发人员为开发人员构建的可伸缩的微服务响应式系统的响应式编程。 ScaleCube Services提供了低延迟的Reactive Microservices库,用于基于八卦协议的对等...
本地服务发现 (LSD) 提供了一种类似 SSDP(http over udp-multicast)的机制来向本地邻居宣布特定群的存在。 该模块由。 安装 npm install bittorrent-lsd 例子 const opts = { peerId: new Buffer('...