package
main
import
(
"context"
"errors"
"fmt"
clientv3
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net"
"os"
"strings"
"sync"
"test/etcd"
"time"
)
var
IP string
var
PID
int
func init(){
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoder := zapcore.NewConsoleEncoder(encoderConfig)
var
clevel zapcore.Level
clevel.Set(
"debug"
)
log := zap.New(zapcore.NewCore(encoder, zapcore.AddSync(os.Stdout), clevel))
zap.ReplaceGlobals(log)
IP, _ = GetLocalIP()
PID = os.Getpid()
}
func GetLocalIP() (string,error){
addrs, err := net.InterfaceAddrs()
if
err != nil {
zap.S().Warn(err)
return
""
, err
}
for
_, addr := range addrs{
if
ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback(){
if
ipnet.IP.To4() != nil {
return
ipnet.IP.
String
(), nil
}
}
}
return
""
, errors.New(
"unable to determine local ip"
)
}
func RegisterService(endpoints []string, key string, value string) *Service{
s := &Service{
Value: value,
endpoints: endpoints,
}
s.TTL =
5
s.Key = strings.Join([]string{key, IP, fmt.Sprintf(
"%d"
, PID)},
"/"
)
s.init()
return
s
}
type Service struct{
Key string
Value string
endpoints []string
IsAlive bool
heartCancel context.CancelFunc
TTL int64
grant *clientv3.LeaseGrantResponse
}
func (s *Service) init(){
etcd.EnableEtcd(s.endpoints)
go s.start()
}
func (s *Service) start(){
defer func(){
s.IsAlive =
false
s.deleteService()
etcd.Cli().Lease.Close()
}()
var
aliveRsp <-chan *clientv3.LeaseKeepAliveResponse
var
err error
var
ctx context.Context
for
{
if
!s.IsAlive{
s.registerService()
aliveRsp, err = etcd.Cli().KeepAlive(context.TODO(), s.grant.ID)
if
err != nil{
zap.S().Warn(err)
}
ctx, s.heartCancel = context.WithCancel(context.Background())
}
select{
case
rsp := <- aliveRsp:
if
rsp ==nil{
s.IsAlive =
false
zap.S().Info(
"service missing signal"
)
time.Sleep(time.Second*
5
)
continue
}
s.IsAlive =
true
zap.S().Debugf(
"service alive %v"
, rsp.ID)
case
<- ctx.Done():
zap.S().Info(
"service stopping"
)
return
}
}
}
func (s *Service) Stop(){
s.heartCancel()
}