分布式学习十二:zookeeper实现数据订阅/发布
【摘要】 数据订阅/发布在分布式集群中,假设数据库发生了改动,就得修改所有分布式服务的数据库配置我们可以通过zookeeper来实现数据库配置的订阅发布我们先初始化数据库配置项环境在zookeeper配置以下数据[zk: localhost:2181(CONNECTED) 51] get /config-server/app1/database{"Host":"127.0.0.1:3300","Use...
数据订阅/发布
在分布式集群中,假设数据库发生了改动,就得修改所有分布式服务的数据库配置
我们可以通过zookeeper来实现数据库配置的订阅发布
我们先初始化数据库配置项环境
在zookeeper配置以下数据
[zk: localhost:2181(CONNECTED) 51] get /config-server/app1/database
{"Host":"127.0.0.1:3300","User":"root","Password":"233274","Database":"test"}
复制
go代码环境准备
引入
github.com/go-zookeeper/zk
复制
go.mod内容为:
module zkStudy
go 1.17
require (
github.com/go-sql-driver/mysql v1.6.0
github.com/go-zookeeper/zk v1.0.2
github.com/jmoiron/sqlx v1.3.4
)
复制
发布数据库配置
我们只需要set path,在zk中将自动把数据发布到订阅此目录的客户端中
以下代码,每2秒更改一次数据库数据
func loopChangeDbConfig() {
var dbConfig = config.DatabaseConfig{Host: "127.0.0.1:3300",User: "root",Password: "123456",Database: "test"}
t := time.NewTicker(2 * time.Second)
for {
select {
case <-t.C:
dbConfig.Password=strconv.Itoa(rand.Intn(999999)+100000)
jsonByte,_ := json.Marshal(dbConfig)
_,err := zkConnect.Set(databaseZKPath,jsonByte,-1)
if err!=nil {
fmt.Println("zk set dbConfig path err :", err)
return
}
}
}
}
复制
订阅数据库配置
通过zk.getW方法,获取数据并返回一个event单向通道,通过此通道可监听获取一条事件更改数据:
func getDatabaseConfig() <-chan zk.Event {
//listen mysql-config path
jsonStrByte, _, event, err := zkConnect.GetW(databaseZKPath)
if err != nil {
panic(err)
}
_ = json.Unmarshal(jsonStrByte, &databaseConfig)
fmt.Printf("%+v 123\n", databaseConfig)
return event
}
复制
获取到event之后,新开协程,进行阻塞获取通道,当获取到数据后,重新获取配置并继续获取一个通道监听数据
func listenDBConfigChange(event <-chan zk.Event) {
var e zk.Event
for {
e = <-event
//if node data change,db reconnect
if e.Type == zk.EventNodeDataChanged {
fmt.Printf("node data changed: %s, \n",e.Path)
event = getDatabaseConfig()
err := db.Close()
if err != nil {
panic(err)
}
connectDb()
dbTest()
}
}
}
复制
运行结果:
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)