zookeeper简单使用+工具类

Zookeeper是一种分布式协调服务,旨在帮助构建可靠的分布式系统。它提供了一组原语,用于协调不同进程之间的通信和同步。Zookeeper可以用于各种分布式应用程序,例如分布式锁,配置管理,队列等。

Zookeeper的核心概念是znode,它是一种特殊的节点,类似于文件系统中的目录或文件。每个znode都有一个名称和一个关联的数据。Zookeeper维护了一个分层的znode命名空间,类似于文件系统中的目录树。通过创建,删除和更新znode,应用程序可以实现在不同进程之间共享状态的协调。

Zookeeper还提供了一些特殊的znode,例如临时节点和顺序节点,用于实现分布式锁和队列等功能。

Zookeeper使用一种称为“Zab”的协议来保持数据的一致性。Zab协议确保在分布式系统中的所有节点之间,只有一个节点是主节点,其余节点都是从节点。主节点负责处理所有写操作,然后将结果复制到所有从节点。这种方式可以保证系统在任何时候都具有一致的状态。

永久节点和临时节点

Zookeeper中,永久节点和临时节点的不同点在于它们的生命周期和用途。

  1. 生命周期:
  • 永久节点:一旦创建,节点将一直存在直到被显式删除。即使客户端和Zookeeper Server之间的连接断开,节点也会一直存在于Zookeeper Server上。
  • 临时节点:当创建临时节点的客户端和Zookeeper Server短暂的失去连接时,节点会被自动删除。如果客户端与Zookeeper Server之间的连接恢复,则节点将重新出现在Zookeeper Server上。
  1. 用途:
  • 永久节点:用于存储永久性的数据,例如配置信息、元数据等。
  • 临时节点:用于表示在线客户端的状态,例如任务调度节点、任务执行状态、队列中的元素等。

因此,开发人员在使用Zookeeper时需要根据实际需求选择永久节点或临时节点来存储数据。

有序节点和无序节点

Zookeeper中,有序节点和无序节点的不同点在于它们的节点名称和排序方式。

  1. 节点名称:
  • 无序节点:节点名称由用户指定,Zookeeper会检查该名称是否已经存在,如果已存在则会报错。节点名称可以是任何字符串。
  • 有序节点:节点名称由Zookeeper自动分配,节点名称的格式为指定节点名称后跟着一个序号,例如:/myapp/node00001。在分配节点名称时,Zookeeper会根据节点编号的大小来排序节点。
  1. 排序方式:
  • 无序节点:节点无法进行排序,只能根据节点名称进行查找。
  • 有序节点:节点根据节点编号的大小进行排序,可以通过节点的编号来查找节点,而不是通过节点的名称。

有序节点在实际应用中,常用于实现队列或者锁等功能,因为有序节点总是能够保证执行的先后顺序。而无序节点通常用于存储配置信息等,因为修改、删除等操作较频繁,使用有序节点会带来一定的复杂度。

Springboot集成

Curator是一个Zookeeper客户端库,提供了一组简单易用的API,可以帮助开发者更容易地与Zookeeper进行交互。Curator也提供了一些实用的特性,例如重试机制、选举、分布式锁等,可以帮助开发者构建可靠的分布式应用程序。

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
#地址和端口号
curator.zookeeper.address=127.0.0.1:2181
#会话超时时间 单位ms
curator.zookeeper.session_timeout=60000
#连接超时时间 单位ms
curator.zookeeper.connection_timeout=15000
#默认为操作的根节点,为了实现不同的Zookeeper业务之间的隔离,所有操作都是基于该目录进行的
curator.zookeeper.name_space=beixian
#重试间隔时间
curator.zookeeper.base_sleep_time_ms=3000
#重试次数
curator.zookeeper.max_retries=10

CuratorConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
public class CuratorConfig {

@Value("${curator.zookeeper.address}")
private String zkAddress;
@Value("${curator.zookeeper.session_timeout}")
private int sessionTimeout;
@Value("${curator.zookeeper.connection_timeout}")
private int connectionTimeout;
@Value("${curator.zookeeper.name_space}")
private String nameSpace;
@Value("${curator.zookeeper.base_sleep_time_ms}")
private int baseSleepTimeMs;
@Value("${curator.zookeeper.max_retries}")
private int maxRetries;

@Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
return CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.namespace(nameSpace)
.build();

}
}

工具类

通过以下工具类实现对zookeeper节点的增删改查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@Component
@Slf4j
public class ZookeeperService {

@Autowired
private CuratorFramework client;

/**
* 创建永久Zookeeper节点
*
* @param nodePath 节点路径(如果父节点不存在则会自动创建父节点),如:/curator
* @param nodeValue 节点数据
* @return 返回创建成功的节点路径
*/
public String createPersistentNode(String nodePath, String nodeValue) {
try {
return client.create().creatingParentsIfNeeded().forPath(nodePath, nodeValue.getBytes());
} catch (Exception e) {
log.error("创建永久Zookeeper节点失败,nodePath:{},nodeValue:{}", nodePath, nodeValue, e);
}
return null;
}

/**
* 创建永久有序Zookeeper节点
*
* @param nodePath 节点路径(如果父节点不存在则会自动创建父节点),如:/curator
* @param nodeValue 节点数据
* @return 返回创建成功的节点路径
*/
public String createSequentialPersistentNode(String nodePath, String nodeValue) {
try {
return client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(nodePath, nodeValue.getBytes());
} catch (Exception e) {
log.error("创建永久有序Zookeeper节点失败,nodePath:{},nodeValue:{}", nodePath, nodeValue, e);
}
return null;
}

/**
* 创建临时Zookeeper节点
*
* @param nodePath 节点路径(如果父节点不存在则会自动创建父节点),如:/curator
* @param nodeValue 节点数据
* @return 返回创建成功的节点路径
*/
public String createEphemeralNode(String nodePath, String nodeValue) {
try {
return client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(nodePath, nodeValue.getBytes());
} catch (Exception e) {
log.error("创建临时Zookeeper节点失败,nodePath:{},nodeValue:{}", nodePath, nodeValue, e);
}
return null;
}

/**
* 创建临时有序Zookeeper节点
*
* @param nodePath 节点路径(如果父节点不存在则会自动创建父节点),如:/curator
* @param nodeValue 节点数据
* @return 返回创建成功的节点路径
* @since 1.0.0
*/
public String createSequentialEphemeralNode(String nodePath, String nodeValue) {
try {
return client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(nodePath, nodeValue.getBytes());
} catch (Exception e) {
log.error("创建临时有序Zookeeper节点失败,nodePath:{},nodeValue:{}", nodePath, nodeValue, e);
}
return null;
}

/**
* 检查Zookeeper节点是否存在
*
* @param nodePath 节点路径
* @return boolean 如果存在则返回true
*/
public boolean checkExists(String nodePath) {
try {
Stat stat = client.checkExists().forPath(nodePath);

return stat != null;
} catch (Exception e) {
log.error("检查Zookeeper节点是否存在出现异常,nodePath:{}", nodePath, e);
}
return false;
}

/**
* 获取某个Zookeeper节点的所有子节点
*
* @param nodePath 节点路径
* @return 返回所有子节点的节点名
*/
public List<String> getChildren(String nodePath) {
try {
return client.getChildren().forPath(nodePath);
} catch (Exception e) {
log.error("获取某个Zookeeper节点的所有子节点出现异常,nodePath:{}", nodePath, e);
}
return null;
}

/**
* 获取某个Zookeeper节点的数据
*
* @param nodePath 节点路径
* @return 节点存储的数据
*/
public String getData(String nodePath) {
try {
return new String(client.getData().forPath(nodePath));
} catch (Exception e) {
log.error("获取某个Zookeeper节点的数据出现异常,nodePath:{}", nodePath, e);
}
return null;
}

/**
* 设置某个Zookeeper节点的数据
*
* @param nodePath 节点路径
*/
public void setData(String nodePath, String newNodeValue) {
try {
client.setData().forPath(nodePath, newNodeValue.getBytes());
} catch (Exception e) {
log.error("设置某个Zookeeper节点的数据出现异常,nodePath:{}", nodePath, e);
}
}

/**
* 删除某个Zookeeper节点
*
* @param nodePath 节点路径
*/
public void delete(String nodePath) {
try {
client.delete().guaranteed().forPath(nodePath);
} catch (Exception e) {
log.error("删除某个Zookeeper节点出现异常,nodePath:{}", nodePath, e);
}
}

/**
* 级联删除某个Zookeeper节点及其子节点
*
* @param nodePath 节点路径
*/
public void deleteChildrenIfNeeded(String nodePath) {
try {
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(nodePath);
} catch (Exception e) {
log.error("级联删除某个Zookeeper节点及其子节点出现异常,nodePath:{}", nodePath, e);
}
}

/**
* 监听节点变化
*
* @param nodePath 节点路径
*/
public void cacheListener(String nodePath, CuratorCacheListener listener) {
//1.创建curatorCache对象
CuratorCache curatorCache = CuratorCache.build(client, nodePath);
//2.注册监听器
curatorCache.listenable().addListener(listener);
//3.开启监听:true加载缓冲数据
curatorCache.start();
}
}

监听改动

要通过Curator来监听Zookeeper上的目录,可以使用Curator提供的CuratorCache类。CuratorCache类可以监听一个目录下的所有子节点,并且可以自动处理节点的新增、删除、更新等事件。下面是一个使用CuratorCache类监听Zookeeper上目录的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

@Component
@Slf4j
public class CuratorWatcher {

@Autowired
private CuratorFramework curatorFramework;


@PostConstruct
public void init() {
String path = "/";
CuratorCache curatorCache = CuratorCache.build(curatorFramework, path);
curatorCache.listenable().addListener((type, childData, childData1) -> {
switch (type) {
case NODE_CHANGED:
log.info("节点数据修改");
log.info("修改前的节点路径:" + childData.getPath());
log.info("修改前的节点数据:" + new String(childData.getData()));
log.info("修改后的节点路径:" + childData1.getPath());
log.info("修改后的节点数据:" + new String(childData1.getData()));
break;
case NODE_CREATED:
log.info("新增节点");
log.info("新增节点的路径" + childData1.getPath());
log.info("新增节点的数据:" + new String(childData1.getData()));
break;
case NODE_DELETED:
log.info("删除节点");
log.info("删除节点的路径" + childData.getPath());
log.info("删除节点的数据:" + new String(childData.getData()));
break;
default:
break;
}
});
curatorCache.start();
}
}

简单实现分布式锁

InterProcessMutex是Zookeeper中提供的一种分布式锁实现方式。它基于Zookeeper的有序节点的特性实现,通过在Zookeeper上创建一个有序节点来代表锁,并通过检查该节点的前一个节点是否存在来控制锁的获取和释放。

InterProcessMutex的主要方法包括:

  1. acquire():获取锁,如果锁处于可用状态,则获取锁,在锁被释放之前将一直阻塞。
  2. acquire(long time, TimeUnit unit):尝试获取锁,最多阻塞指定的时间,如果成功获取锁则返回true,否则返回false。
  3. release():释放锁,如果当前线程持有锁,则释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
@Slf4j
public class ZookeeperLockService {
@Autowired
private CuratorFramework client;

public boolean lock(String path, long timeMs) {
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
return lock.acquire(timeMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return false;
}

public boolean unlock(String path) {
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
lock.release();
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return false;
}
}

zookeeper简单使用+工具类
https://cason.work/2023/04/13/zookeeper简单使用-工具类/
作者
Cason Mo
发布于
2023年4月13日
许可协议