博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper学习【3】服务发现
阅读量:4704 次
发布时间:2019-06-10

本文共 10332 字,大约阅读时间需要 34 分钟。

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

实现代码如下:

import com.alibaba.fastjson.JSON;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.exception.ZkNoNodeException;/** * 代表工作服务器 */public class WorkServer {    private ZkClient zkClient;    // ZooKeeper    private String configPath;    // ZooKeeper集群中servers节点的路径    private String serversPath;    // 当前工作服务器的基本信息    private ServerData serverData;    // 当前工作服务器的配置信息    private ServerConfig serverConfig;    private IZkDataListener dataListener;    public WorkServer(String configPath, String serversPath,                      ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {        this.zkClient = zkClient;        this.serversPath = serversPath;        this.configPath = configPath;        this.serverConfig = initConfig;        this.serverData = serverData;        this.dataListener = new IZkDataListener() {            public void handleDataDeleted(String dataPath) throws Exception {            }            public void handleDataChange(String dataPath, Object data)                    throws Exception {                String retJson = new String((byte[])data);                ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson,ServerConfig.class);                updateConfig(serverConfigLocal);                System.out.println("new Work server config is:"+serverConfig.toString());            }        };    }    // 启动服务器    public void start() {        System.out.println("work server start...");        initRunning();    }    // 停止服务器    public void stop() {        System.out.println("work server stop...");        zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点    }    // 服务器初始化    private void initRunning() {        registMe(); // 注册自己        zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件    }    // 启动时向zookeeper注册自己的注册函数    private void registMe() {        String mePath = serversPath.concat("/").concat(serverData.getAddress());        try {            zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)                    .getBytes());        } catch (ZkNoNodeException e) {            zkClient.createPersistent(serversPath, true);            registMe();        }    }    // 更新自己的配置信息    private void updateConfig(ServerConfig serverConfig) {        this.serverConfig = serverConfig;    }}
/** * 调度类 */public class SubscribeZkClient {    private static final int  CLIENT_QTY = 5; // Work Server数量    private static final String  ZOOKEEPER_SERVER = "192.168.1.105:2181";    private static final String  CONFIG_PATH = "/config";    private static final String  COMMAND_PATH = "/command";    private static final String  SERVERS_PATH = "/servers";    public static void main(String[] args) throws Exception {        List
clients = new ArrayList
(); List
workServers = new ArrayList
(); ManageServer manageServer = null; try { // 创建一个默认的配置 ServerConfig initConfig = new ServerConfig(); initConfig.setDbPwd("123456"); initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb"); initConfig.setDbUser("root"); // 实例化一个Manage Server ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig); manageServer.start(); // 启动Manage Server // 创建指定个数的工作服务器 for ( int i = 0; i < CLIENT_QTY; ++i ) { ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); clients.add(client); ServerData serverData = new ServerData(); serverData.setId(i); serverData.setName("WorkServer#"+i); serverData.setAddress("192.168.1."+i); WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig); workServers.add(workServer); workServer.start(); // 启动工作服务器 } System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } }}
/** * 服务器基本信息 */public class ServerData {    private String address;    private Integer id;    private String name;    public String getAddress() {        return address;    }    public void setAddress(String address) {        this.address = address;    }    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    @Override    public String toString() {        return "ServerData [address=" + address + ", id=" + id + ", name="                + name + "]";    }}
/** * 配置信息 */public class ServerConfig {    private String dbUrl;    private String dbPwd;    private String dbUser;    public String getDbUrl() {        return dbUrl;    }    public void setDbUrl(String dbUrl) {        this.dbUrl = dbUrl;    }    public String getDbPwd() {        return dbPwd;    }    public void setDbPwd(String dbPwd) {        this.dbPwd = dbPwd;    }    public String getDbUser() {        return dbUser;    }    public void setDbUser(String dbUser) {        this.dbUser = dbUser;    }    @Override    public String toString() {        return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd                + ", dbUser=" + dbUser + "]";    }}
import com.alibaba.fastjson.JSON;import org.I0Itec.zkclient.IZkChildListener;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.exception.ZkNoNodeException;import org.I0Itec.zkclient.exception.ZkNodeExistsException;import java.util.List;public class ManageServer {    // zookeeper的servers节点路径    private String serversPath;    // zookeeper的command节点路径    private String commandPath;    // zookeeper的config节点路径    private String configPath;    private ZkClient zkClient;    private ServerConfig config;    // 用于监听servers节点的子节点列表的变化    private IZkChildListener childListener;    // 用于监听command节点数据内容的变化    private IZkDataListener dataListener;    // 工作服务器的列表    private List
workServerList; public ManageServer(String serversPath, String commandPath, String configPath, ZkClient zkClient, ServerConfig config) { this.serversPath = serversPath; this.commandPath = commandPath; this.zkClient = zkClient; this.config = config; this.configPath = configPath; this.childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List
currentChilds) throws Exception { // TODO Auto-generated method stub workServerList = currentChilds; // 更新内存中工作服务器列表 System.out.println("work server list changed, new list is "); execList(); } }; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { // TODO Auto-generated method stub // ignore; } public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub String cmd = new String((byte[]) data); System.out.println("cmd:"+cmd); exeCmd(cmd); // 执行命令 } }; } private void initRunning() { zkClient.subscribeDataChanges(commandPath, dataListener); zkClient.subscribeChildChanges(serversPath, childListener); } /* * 1: list 2: create 3: modify */ private void exeCmd(String cmdType) { if ("list".equals(cmdType)) { execList(); } else if ("create".equals(cmdType)) { execCreate(); } else if ("modify".equals(cmdType)) { execModify(); } else { System.out.println("error command!" + cmdType); } } // 列出工作服务器列表 private void execList() { System.out.println(workServerList.toString()); } // 创建config节点 private void execCreate() { if (!zkClient.exists(configPath)) { try { zkClient.createPersistent(configPath, JSON.toJSONString(config) .getBytes()); } catch (ZkNodeExistsException e) { zkClient.writeData(configPath, JSON.toJSONString(config) .getBytes()); // config节点已经存在,则写入内容就可以了 } catch (ZkNoNodeException e) { String parentDir = configPath.substring(0, configPath.lastIndexOf('/')); zkClient.createPersistent(parentDir, true); execCreate(); } } } // 修改config节点内容 private void execModify() { // 我们随意修改config的一个属性就可以了 config.setDbUser(config.getDbUser() + "_modify"); try { zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNoNodeException e) { execCreate(); // 写入时config节点还未存在,则创建它 } } // 启动工作服务器 public void start() { initRunning(); } // 停止工作服务器 public void stop() { zkClient.unsubscribeChildChanges(serversPath, childListener); zkClient.unsubscribeDataChanges(commandPath, dataListener); }}

 

转载于:https://www.cnblogs.com/tinyj/p/10029130.html

你可能感兴趣的文章
在单位上班的25条建议(建议收藏)
查看>>
web前端--http协议
查看>>
欧拉定理证明&阶乘的逆元
查看>>
Prime Game Gym - 101981J(网络流/二分图)
查看>>
Teamwork Gym - 101492E (dp)
查看>>
No Link, Cut Tree! Gym - 101484F(dp)
查看>>
Coprimes Gym - 101492C(bitset)
查看>>
Partial Tree UVALive - 7190(完全背包)
查看>>
『深度应用』NLP机器翻译深度学习实战课程·零(基础概念)
查看>>
『开发技术』Windows极简安装使用face_recognition实现人脸识别
查看>>
『深度应用』NLP命名实体识别(NER)开源实战教程
查看>>
『开发技术』GPU训练加速原理(附KerasGPU训练技巧)
查看>>
『深度应用』NLP机器翻译深度学习实战课程·壹(RNN base)
查看>>
『深度应用』一小时教你上手MaskRCNN·Keras开源实战(Windows&Linux)
查看>>
『王霸之路』从0.1到2.0一文看尽TensorFlow奋斗史
查看>>
系统测试中需要注意的点
查看>>
Elasticsearch TermQuery 详解
查看>>
一个困扰了我N久的bug , android.enableAapt2=false 无效
查看>>
查看客户端的IP地址,机器名,MAC地址,登陆名等信息
查看>>
移动端经常遇到的小bug
查看>>