基于ectd实现go的服务注册
2021年11月27日 18:42
描述
基于etcd实现go程序的服务注册,所看的学习资料均比较简单, 实际中要经过反复测试,自己写了一个先用着,有坑再填.
需求
1 2 3 4 5 | 1 . 注册一个服务: 程序启动时 2 . 注销服务: a. 程序异常退出时, 自动注销, 有 5 秒TTL延迟; b. 调用Stop()接口主动注销 3 . 健壮性: a.ectd单节点重启, ectd本身支持; b. etcd全部重启后, 能够恢复正常 4 . key的格式: 前缀/ip/pid 5 . value: 字符串.(可以先转为json, 再转string) |
创建etcd客户端
1 |
//不是重点, 摘要如下代码
cli, err := clientv3.New(clientv3.Config{
Endpoints: ec.Endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
创建Service对象, 包含服务注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | package main import ( "context" "errors" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/clientv3util" "go.uber.org/zap" "go.uber.org/zap/zapcore" "net" "os" "strings" "sync" "test/etcd" "time" ) var IP string var PID int func init(){ //日志 encoderConfig := zap.NewProductionEncoderConfig() encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder encoder := zapcore.NewConsoleEncoder(encoderConfig) //创建 var clevel zapcore.Level clevel.Set( "debug" ) log := zap.New(zapcore.NewCore(encoder, zapcore.AddSync(os.Stdout), clevel)) //设为全局 zap.ReplaceGlobals(log) // IP, _ = GetLocalIP() PID = os.Getpid() } func GetLocalIP() (string,error){ addrs, err := net.InterfaceAddrs() if err != nil { zap.S().Warn(err) return "" , err } for _, addr := range addrs{ if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback(){ if ipnet.IP.To4() != nil { return ipnet.IP. String (), nil } } } return "" , errors.New( "unable to determine local ip" ) } func RegisterService(endpoints []string, key string, value string) *Service{ s := &Service{ Value: value, endpoints: endpoints, } s.TTL = 5 s.Key = strings.Join([]string{key, IP, fmt.Sprintf( "%d" , PID)}, "/" ) s.init() return s } type Service struct{ Key string //key 格式: /前缀/名称/ip/pid Value string //存放信息 endpoints []string //etcd地址 //心跳 IsAlive bool //是否存活 heartCancel context.CancelFunc //取消心跳 //租约 TTL int64 //租约(秒) grant *clientv3.LeaseGrantResponse //租约 } func (s *Service) init(){ //启用etcd连接 etcd.EnableEtcd(s.endpoints) //心跳 go s.start() } func (s *Service) start(){ defer func(){ s.IsAlive = false //注销服务 s.deleteService() //关闭续约(如果共用会误关?) etcd.Cli().Lease.Close() }() var aliveRsp <-chan *clientv3.LeaseKeepAliveResponse var err error var ctx context.Context for { if !s.IsAlive{ // 注册服务 s.registerService() // 续期信号chan aliveRsp, err = etcd.Cli().KeepAlive(context.TODO(), s.grant.ID) if err != nil{ zap.S().Warn(err) } ctx, s.heartCancel = context.WithCancel(context.Background()) } // 监听心跳信号 select{ case rsp := <- aliveRsp: //异常时无信号 if rsp ==nil{ s.IsAlive = false zap.S().Info( "service missing signal" ) time.Sleep(time.Second* 5 ) continue } s.IsAlive = true zap.S().Debugf( "service alive %v" , rsp.ID) case <- ctx.Done(): zap.S().Info( "service stopping" ) return } } } //停止 func (s *Service) Stop(){ s.heartCancel() } |
[佛說大乘無量壽莊嚴清淨平等覺經pdf](http://www.sxjy360.top/page-download/)
[净土大经科注2014-doc](http://www.sxjy360.top/page-download/)
[此生必看的科学实验-水知道答案](http://www.sxjy360.top/page-download/)
[印光大师十念法(胡小林主讲第1集)](http://www.sxjy360.top/page-download/)