5.1 案例1:监听服务器动态上下线

需求

某分布式系统中,节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到节点服务器的上下线


需求分析

这个案例涉及到 3 个角色:

  1. 分布式服务器 A.(我们需要监视动态上下线的机器)
  2. Zookeeper 服务器 B
  3. 客户端(需要实时的感受到分布式服务器上下线情况的) C

分析:

分布式服务器 A 上下线的时候, 我们需要在客户端 C 上收到分布式服务器 A 上下线的通知.

很显眼, 分布式服务器 A 上下线的通知应该由 Zookeeper 服务器 B 来发出.

让客户端 C 监听 Zookeeper 服务器 B 中节点(znode)的变化, 当节点发生变化的, C 可以得到通知.

znode怎么变化呢? 当分布式服务器 A 上线的时候在 Zookeeper 服务器上注册暂时节点, 当分布式服务器下线的时候暂时节点会自动删除.

对 Zookeeper 服务器B来说, 分布式服务器 A 和客户端 C 来说, AC他们都是客户端.

创建一个节点/servers, 临时节点作为它的子节点.

运行在分布式服务器 A 上的代码:

package com.atguigu.zkcase;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {
    private static final String connString = "hadoop201:2181,hadoop202:2181,hadoop203:2181";
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        /*1. 创建 zookeeper 客户端对象 通过参数来模拟哪个服务器上线了*/
        ZooKeeper zk = new ZooKeeper(connString, 2000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
        /*2. 在zookeeper服务器中创建临时节点, 包含这个服务器的信息*/
        zk.create("/servers/" + args[0], args[0].getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        //进程休眠状态, 模拟当前客户端一直在连接到zookeeper服务器.
        Thread.sleep(Long.MAX_VALUE);
    }
}

运行中客户端 C 上的代码

package com.atguigu.zkcase;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

public class DistributeClient {
    private static final String connString = "hadoop201:2181,hadoop202:2181,hadoop203:2181";

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        /*1. 创建 zookeeper客户端对象*/
        ZooKeeper zk = new ZooKeeper(connString, 2000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
        /*2. 监听 /servers/hadoop201... 节点来感知分布式服务器的上线或下线, 需要循环监听*/
        for (int i = 1; i < 204; i++) {
            String znodePath = "/servers/hadoop20"  + i;
            zk.exists(znodePath, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        if(event.getType() == Event.EventType.NodeCreated){
                            System.out.println(event.getPath() + " 上线了");
                        }else if(event.getType() == Event.EventType.NodeDeleted){
                            System.out.println(event.getPath() + " 下线了");
                        }
                        zk.exists(znodePath, this); // 循环监听
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}
Copyright © 尚硅谷大数据 & 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2018-10-18 11:15:51

results matching ""

    No results matching ""