邵东做网站的公司太原seo关键词排名优化
Zookeeper 原生客户端
zookeeper 官方提供的 java 客户端 API。
ZkClient
开源的 zk 客户端,在原生 API 基础上封装,是一个更易于使用的 zookeeper 客户端。
Curator
开源的 zk 客户端,在原生 API 基础上封装,apache 顶级项目。
推荐使用 Curator,支持 lambda 表达式,链式操作,还有事务管理,且封装了常用的功能。
Zookeeper 原生客户端
maven pom 依赖:
org.apache.zookeeper
zookeeper
3.6.2
// 创建会话
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
// 创建节点
public String void create(final String path, byte data[], List acl, CreateMode createMode, StringCallback cb, Object ctx)
// 读取数据
public List void getChildren(final String path, Watcher watcher, Stat stat, Children2Callback cb, Object ctx)
public List void getData(final String path, Watcher watcher, Stat stat, DataCallback cb, Object ctx)
// 更新数据
public Stat void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
// 检测节点是否存在
public Stat void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
// 权限控制
public void addAuthInfo(String scheme, byte auth[])
// watch
org.apache.zookeeper.Watcher
增删改查 - CRUD
@Slf4j
public class ZKDemo {
private final static String CONNECTIONS_TR = "192.168.132.129:2181";
public void crud() {
// 等待 ZooKeeper 连接完毕,再进行操作,所以使用 CountDownLatch 进行控制。
CountDownLatch countDownLatch = new CountDownLatch(1);
// new ZooKeeper(连接,超时时间,Watcher),watcher 相当于触发器。
try (ZooKeeper zooKeeper = new ZooKeeper(CONNECTIONS_TR, 500, event -> {
// ZooKeeper 连接完毕,countDownLatch 放行。
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
log.debug("节点发生了变化,路径:" + event.getPath());
}
})) {
// 等待 ZooKeeper 连接完毕
countDownLatch.await();
log.debug("当前连接状态是:" + zooKeeper.getState().toString());
// 创建一个节点。create(节点路径, 节点值, ACL 权限, 节点类型)
zooKeeper.create("/node_java_1", "value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 获取数据。getData(节点路径, Watcher, Stat)
byte[] data = zooKeeper.getData("/node_java_1", true, new Stat());
log.debug("节点的值:" + new String(data));
// 修改数据。setData(节点路径, 节点值, 版本控制),-1 表示不做版本控制
// 修改之后会触发 watcher,因为上面 getData 中的 watch 参数为 true
// 注意这个 watch 是一次性的,如果进行第二次 setData,就不会触发 watcher 了,需要在 setData 之前再写一遍 getData,并让 watch 参数为 true
zooKeeper.setData("/node_java_1", "value2".getBytes(), -1);
// 不会触发 watcher 了
zooKeeper.setData("/node_java_1", "value3".getBytes(), -1);
// 这样才能触发 watcher
zooKeeper.getData("/node_java_1", true, new Stat());
zooKeeper.setData("/node_java_1", "value4".getBytes(), -1);
// 获取子节点。getChildren(节点路径, Watcher)
List children = zooKeeper.getChildren("/node_java_1", true);
log.debug("/node_java_1 的子节点:" + children);
// 删除节点。delete(节点路径, 版本控制),-1 表示不做版本控制
zooKeeper.delete("/node_java_1", -1);
} catch (InterruptedException | IOException | KeeperException e) {
e.printStackTrace();
}
}
}
ACL
@Slf4j
public class ZKDemo {
private final static String CONNECTIONS_TR = "192.168.132.129:2181";
public void acl() {
CountDownLatch countDownLatch = new CountDownLatch(1);
try (ZooKeeper zooKeeper = new ZooKeeper(CONNECTIONS_TR, 500, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
log.debug("连接完毕:" + event.getState() + " --> " + event.getType());
}
})) {
countDownLatch.await();
// 创建 ACL
ACL acl_all_digest = new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("root:root")));
ACL acl_create_ip = new ACL(ZooDefs.Perms.CREATE, new Id("ip", "192.168.1.1"));
List acl = Arrays.asList(acl_all_digest, acl_create_ip);
zooKeeper.create("/auth1", "123".getBytes(), acl, CreateMode.PERSISTENT);
// 如果是另一个连接,需要重新执行这句话。因为这个权限是会话级的。
zooKeeper.addAuthInfo("digest", "root:root".getBytes());
zooKeeper.create("/auth1/auth1-1", "123".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
} catch (IOException | InterruptedException | NoSuchAlgorithmException | KeeperException e) {
e.printStackTrace();
}
}
}
弊端
会话的连接是异步的
Watch 需要重复注册
Session 重连机制
如果不用 CountDownLatch 的话,有时能连上,有时连不上。
开发复杂性较高
ZkClient
com.101tec
zkclient
0.11
// 创建会话(同步,重试)
public ZkClient(final String zkServers, final int sessionTimeout,
final int connectionTimeout, final ZkSerializer zkSerializer,
final long operationRetryTimeout)
// 创建节点(同步,递归创建)
public String create(String path,Object data,final List acl,CreateMode mode)
public void createPersistent(String path,boolean createParents,List acl)
public void createPersistent(String path, Object data, List acl)
public String createPersistentSequential(String path,Object data,List acl)
public void createEphemeral(String path, Object data, List acl)
public String createEphemeralSequential(String path,Object data,List acl)
// 删除节点(同步,递归删除)
public boolean delete(String path,int version)
public boolean deleteRecursive(String path)
// 获取节点(同步,避免不存在异常)
public List getChildren(String path)
public T readData(String path, boolean returnNullIfPathNotExists)
public T readData(String path, Stat stat)
// 更新节点(同步,实现CAS,状态返回)
public void writeData(String path, Object datat, int expectedVersion)
public Stat writeDataReturnStat(String path,Object datat,int expectedVersion)
// 检测节点存在(同步)
public boolean exists(String path)
// 权限控制(同步)
public void addAuthInfo(String scheme, final byte[] auth);
public void setAcl(final String path, final List acl);
// 监听器
IZkStateListener (un)subscribeStateChanges(IZkStateListener listener)
IZkDataListener (un)subscribeDataChanges(IZkStateListener listener)
IZkChildListener (un)subscribeChildChanges(IZkStateListener listener)
增删改查
@Slf4j
public class ZKClientDemo {
// 逗号分隔,链接集群。
private final static String CONNECTIONS_TR = "192.168.199.128:2181";
public void crud() {
// 同步连接,如果显示超时,可以把超时时间调大再试
ZkClient zkClient = new ZkClient(CONNECTIONS_TR, 60000);
log.debug(zkClient + " -> success.");
// 级联创建节点
zkClient.createPersistent("/node-1/node-1-1", true);
// 获取子节点
List list = zkClient.getChildren("/node-1");
log.debug("/node-1 的子节点:" + list);
// Watcher 监听不是一次性的
// /node-1 节点数据内容(value)变化触发
zkClient.subscribeDataChanges("/node-1", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
log.debug("节点名称:" + s + "->节点修改后的值" + o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
}
});
zkClient.writeData("/node-1", "node");
zkClient.writeData("/node-1", "node2");
// /node-1 节点变化触发
zkClient.subscribeChildChanges("/node-1", (s, node1_list) ->
log.debug("节点名称:" + s + "->" + "当前的节点列表:" + node1_list));
// 删除节点
zkClient.delete("/node-1/node-1-1");
// 级联删除节点
zkClient.deleteRecursive("/node-1");
}
}
Curator
org.apache.curator
curator-framework
5.1.0
// 创建会话(同步,重试)
CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
CuratorFrameworkFactory.builder().connectString("192.168.11.56:2180")
.sessionTimeoutMs(30000).connectionTimeoutMs(30000)
.canBeReadOnly(false)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.build();
// retryPolicy 连接策略:
// * RetryOneTime: 只重连一次.
// * RetryNTime: 指定重连的次数N.
// * RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功.
// * ExponentialBackoffRetry: 基于"backoff"方式重连,和 RetryUtilElapsed 的区别是重连的时间间隔是动态的.
// * BoundedExponentialBackoffRetry: 同 ExponentialBackoffRetry,增加了最大重试次数的控制.
// 创建节点
client.create().creatingParentIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(aclList)
.forPath(path, "hello, zk".getBytes());
// 删除节点
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path)
// 获取节点
client.getData().storingStatIn(stat).forPath(path);
client.getChildren().forPath(path);
// 更新节点
client.setData().withVersion(version).forPath(path, data)
// 判断节点是否存在
client.checkExists().forPath(path);
// 设置权限
Build.authorization(String scheme, byte[] auth)
client.setACL().withVersion(version)
.withACL(ZooDefs.Ids.CREATOR_ALL_ACL)
.forPath(path);
// 监听器(避免反复监听)
// * Cache 是 curator 中对事件监听的包装,对事件的监听可以近似看做是本地缓存视图和远程 zk 视图的对比过程
// * NodeCache 节点缓存用于处理节点本身的变化 ,回调接口 NodeCacheListener
// * PathChildrenCache 子节点缓存用于处理节点的子节点变化,回调接口 PathChildrenCacheListener
// * TreeCache NodeCache 和 PathChildrenCache 的结合体,回调接口 TreeCacheListener
// 事务支持(保证一组操作的原子性)
Collection results = client.transaction().forOperations(operations);
// 异步支持
// 引入BackgroundCallback接口,用于处理异步接口调用之后服务端返回的结果信息
public void processResult(CuratorFramework client, CuratorEvent event)
// * CuratorEventType 事件类型
// * org.apache.zookeeper.KeeperException.Code 服务器响应码(标识结果)
链接
@Slf4j
public class CuratorDemo {
private final static String CONNECTIONS_TR = "192.168.199.128:2181";
public CuratorFramework getInstance() {
return CuratorFrameworkFactory.builder()
.connectString(CONNECTIONS_TR)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
}
}
增删改查
@Slf4j
public class CuratorDemo {
public void crud() {
CuratorFramework curatorFramework = getInstance();
curatorFramework.start();
log.debug("链接成功");
// 创建节点
try {
String result = curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/curator/curator1/curator11", "123".getBytes());
log.debug("创建节点:" + result);
} catch (Exception e) {
e.printStackTrace();
}
// 查询
try {
Stat stat = new Stat();
byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/curator11");
log.debug("查询:" + new String(bytes) + "-->stat:" + stat);
} catch (Exception e) {
e.printStackTrace();
}
// 更新
try {
Stat stat = curatorFramework.setData().forPath("/curator11", "123321".getBytes());
log.debug("更新:" + stat);
} catch (Exception e) {
e.printStackTrace();
}
// 删除节点
try { //默认情况下,version为 -1
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator");
} catch (Exception e) {
e.printStackTrace();
}
}
}
异步操作
@Slf4j
public class CuratorDemo {
public void asynchronous() {
CuratorFramework curatorFramework = getInstance();
curatorFramework.start();
log.debug("链接成功");
// 异步操作
ExecutorService service = Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
inBackground((curatorFramework1, curatorEvent) -> {
log.debug(Thread.currentThread().getName() + "->resultCode:" + curatorEvent.getResultCode() + "->" + curatorEvent.getType());
countDownLatch.countDown();
}, service).forPath("/node", "123".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
service.shutdown();
}
}
}
事务操作
@Slf4j
public class CuratorDemo {
public void transaction() {
CuratorFramework curatorFramework = getInstance();
curatorFramework.start();
log.debug("链接成功");
// 事务操作(curator独有的)
try {
// TODO inTransaction 方法已弃用
Collection resultCollections = curatorFramework.inTransaction()
.create().forPath("/demo1", "111".getBytes())
.and().setData().forPath("/demo1", "222".getBytes())
.and().commit();
for (CuratorTransactionResult result : resultCollections) {
log.debug(result.getForPath() + "->" + result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
监听
@Slf4j
public class CuratorDemo {
/**
* 三种 watcher 来做节点的监听
* pathcache 监视一个路径下子节点的创建、删除、节点数据更新
* NodeCache 监视一个节点的创建、更新、删除
* TreeCache pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
* 缓存路径下的所有子节点的数据
*/
public void watcher() {
CuratorFramework curatorFramework = getInstance();
curatorFramework.start();
log.debug("链接成功");
// 节点变化 NodeCache
// TODO NodeCache 已弃用
try {
NodeCache cache = new NodeCache(curatorFramework, "/curator", false);
cache.start(true);
cache.getListenable().addListener(() -> log.debug("节点数据发生变化,变化后的结果" +
":" + new String(cache.getCurrentData().getData())));
curatorFramework.setData().forPath("/curator", "123".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
// PatchChildrenCache
// TODO PatchChildrenCache 已弃用
try {
PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/event", true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
cache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
log.debug("增加子节点");
break;
case CHILD_REMOVED:
log.debug("删除子节点");
break;
case CHILD_UPDATED:
log.debug("更新子节点");
break;
default:
break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event", "event".getBytes());
TimeUnit.SECONDS.sleep(1);
log.debug("1");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1", "1".getBytes());
TimeUnit.SECONDS.sleep(1);
log.debug("2");
curatorFramework.setData().forPath("/event/event1", "222".getBytes());
TimeUnit.SECONDS.sleep(1);
log.debug("3");
curatorFramework.delete().forPath("/event/event1");
log.debug("4");
} catch (Exception e) {
e.printStackTrace();
}
}
}
弃用的方法日后补上。