nats-server系统服务只能使用sc命令注册

2021年8月19日 02:42

 

起因描述

 
windows环境下nats-server注册成系统, 官方推荐是sc命令
 
而我觉得sc命令台太麻烦, 为什么不用github.com/kardianos/service,或者更方便的工具呢?
 

方法一:使用nssm通过界面进行配置

 
1. 下载地址
 
http://nssm.cc/download
 
2. 过程省略
 
3. 结果死活不行,由于是第三方工具,出错找不到原因,放弃
 
 

方法二:通过服务外壳作为系统服务

 
* 介绍
 
使用go生成一个demo.exe,再将demo.exe注册成系统,demo.exe读取配置文件,然后demo.exe通过exec.Command启动nats-server.exe
 
go比较好用的库可以用github.com/kardianos/service
 
 
* demo.exe启动nats-server.exe关键源码
 
p.cmd = exec.Command(fullExec, p.Args...)
p.cmd.Dir = p.Dir
p.cmd.Env = append(os.Environ(), p.Env...)
 
* demo.json配置示例
 
{
"Name": "demo",
"DisplayName": "demo",
"Description": "demo for nats-server",

"Dir": "d:\\nats-server",
"Exec": "xxxx\\nats-server.exe",
"Args": ["-c ","xxx\\nats-server.conf"],
"Env": [
],

"Stderr": "C:\\log\\nats_err.log",
"Stdout": "C:\\log\\nats_out.log"
}
 
* 结果报错
 
The service process could not connect to the service controller.
 
意思是一个服务不能连接到另一个服务,啥意思呢? 也就是通过exec.Command启动的竟然是服务,不应该是exe么.
 
* nats-server启动源码分析
 
nats-server.exe的启动入口源码
 
func Run(server *Server) error {
    //入口1:docker
if dockerized {
server.Start()
return nil
}
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return err
}
    //入口2: 交互模式
if isInteractive {
server.Start()
return nil
}
    //入口3: 启动服务(serviceName被写死了,只能是nats-server)
return svc.Run(serviceName, &winServiceWrapper{server})
}
 
* 错误原因总结
 
当以外壳方式启动时,走的是"入口3",也是启动服务,如果没有nats-server,它就啥事也没干,结果nats-server并没有启动
 

方法三:sc命令

 
官方推荐
 

总结

 
1、nat-server只能以sc命令注册成服务。怕麻烦可以写一个bat。
 
2、nats-server服务名只能是nats-server
 
 

Tags: nats
评论(75) 阅读(1060)

zap日志写入通道被覆盖

2021年8月17日 20:00

 

描述

 
实现了一个io.Writer接口,允许zap日志写入,传递给界面,出现了日志被覆盖、重复的情况。
 

原因

 
io.Writer的接口Write传入的[]byte真实类型是slice。通道传递slice时,用的是引用传递,所以真实数据有被覆盖的情况 
 

示例

 
* zap初始化
 
...
out = COut()
zapcore.NewCore(encoder, zapcore.AddSync(out), clevel),
...
 
* 自定义日志写入Channel
 
package logs

var _GlobalChanOutput = &ChanOutput{
outs: make([]chan *[]byte, 0),
}

/*---------------------------------------
全局函数
示例
logs.COut().AddOut(xx)
---------------------------------------*/

//全局ChanOutput
func COut() *ChanOutput{
return _GlobalChanOutput
}

/*---------------------------------------
log to channel
---------------------------------------*/

type ChanOutput struct{
outs []chan *[]byte
}

//io.Writer接口
func (c *ChanOutput) Write(p []byte) (n int, err error) {
count :=0

//数据复制(防止被覆盖)
_p := make([]byte, len(p))
copy(_p, p)

for _, out :=range c.outs{
select{
case out <- &_p:
count += 1
default:
}
}
return count, nil
}

func (c *ChanOutput) AddOut(out chan *[]byte) {
c.outs = append(c.outs, out)
}
 
* 读取日志
 
logOut := make(chan *[]byte, 1024)
logs.COut().AddOut(logOut)

pctx, pctxCancel := context.WithCancel(context.Background())
go func(ctx context.Context){
    select {
    case msg := <- logOut:
        fmt.Println(string(*msg))
    case <-ctx.Done():
        break
    }
}(pctx)
 
 

评论(29) 阅读(648)

msgpack代替json防止丢失类型

2021年6月30日 15:12

 

描述

 
开发了一个通信协议, 协议对数据类型比较敏感, 根据不同数据数据类型使用不同格式。golang的数据类型能够满足需求。
 
然而,将协议转为http方式时,如果传入json,会发现数据类型不符合预期
 
例如,本来是应该int结果是float64; 本来应该是[]byte,结果是string
 

环境

 
http服务器: gin
 
开发语言: golang
 

解决方法探寻

 
什么传输协议会带上数据类型?
 
* json类型丢失
 
默认情况下gin的BindJSON,可将传入的参数绑定到具体的结构体,产生类型转换。然后,我的结果需要是动态的,不知道参数的具体内容。
 
* 自定义参数格式,将类型带上。例如 marchinery
 
Arg struct{
    Type string
    Value interface{}
}
 
缺点:太麻烦
 
* 常用协议中选择
 
xml         可以用,不理想
 
yaml        不能解决问题
 
protobuf    不能解决问题
 
messagepack 有详细的数据类型,完美
 
 

msgpack替代json

 
gin示例
 
func SendXX(c *gin.Context){
    rawData, _ := c.GetRawData()
    var param interface{}
    msgpack.Unmarshal(rawData, ¶m)

result := struct{
Name string
}{
Name: "xxx",
}
resultData, _ := msgpack.Marshal(result)
c.Data(200, "application/x-msgpack", resultData)
}
 

备注

 
* gin自带的msgpack版本是1.1.7不好用,不会自动进行类型转换。 最好改用  github.com/vmihailenco/msgpack/v5
 
* 替换gin中的部分接口就可以,不用全部改为msgpack
 
 
 
 

评论(124) 阅读(1382)

nats的安装docker方式

2021年6月12日 12:02

描述

    docker-compose方式启动nats-server。nats的镜像非常小, docker-cpomse进行管理。下面是配置方式
 

docker-compose.yaml配置 

 
version: "2"
services:
   nats-server:
       #image: provide/nats-server:latest
       image: nats:latest
       volumes:
           - ./nats-server.conf:/nats-server.conf
       ports:
           - 4222:4222
 

配置文件nats-server.conf

# Client port of 4222 on all interfaces
port: 4222

# HTTP monitoring port
monitor_port: 8222

# This is for clustering multiple servers together.
cluster {

  # Route connections to be received on any interface on port 6222
  port: 6222

  # Routes are protected, so need to use them with --routes flag
  # e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222
  authorization {
    user: ruser
    password: T0pS3cr3t
    timeout: 2
  }

  # Routes are actively solicited and connected to from this server.
  # This Docker image has none by default, but you can pass a
  # flag to the gnatsd docker image to create one to an existing server.
  routes = []
}
 

启动

 
docker-compose -f docker-compose.yaml up/restart/start/stop
 

Tags: nats
评论(100) 阅读(838)

go实现Event事件模型修正

2021年6月03日 22:39

 

描述

 
之前文章中实现了event事件模型,实际使用中发现了几个问题
 
 
1. 并发会出现情况send先执行, 此时还没有waiter。将wait拆分成addWaiter和wait()
 
2. 不能使用缓冲通道。会出现send完成,wait还未收到的情况。
 

用法修正

 
e.Reset()
waiter := e.AddWaiter()
waiter := e.Wait(waiter, xxx)
e.Send(xxx)
 

事件实现

package common

import (
	"context"
	"errors"
	"go.uber.org/zap"
	"time"
)

var NOT_USED = struct{}{}

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	//监听者
	waiters  []chan interface{}
	//结果
	result interface{}
	//上下文控制
	ctxBg 	context.Context
	ctxCancel context.CancelFunc
	//日志
	log *zap.SugaredLogger
}


func (e *Event) AddWaiter() *chan interface{}{
	//等待者
	resultChan := make(chan interface{}, 0)
	e.waiters = append(e.waiters, resultChan)
	return &resultChan
}

//等待结果
func (e *Event) Wait(waiter *chan interface{}, timeout time.Duration) (interface{}, error){
	if e.result ==NOT_USED{
		ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*timeout)
		defer cancel()

		//等待
		select{
		case result := <- *waiter:
			return result, nil
		case <- ctx.Done():
			if ctx.Err() == context.Canceled{
				return nil, nil
			}
			return nil, errors.New("event wait timeout")
		}
		return nil, nil
	}else{
		return e.result, nil
	}
}

//发送结果
func (e *Event) Send(result interface{}) error{
	//防止发送多次
	if e.result !=NOT_USED{
		return errors.New("Event is used")
	}

	ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*3)
	defer cancel()

	for _, resultChan := range e.waiters{
		select{
		case resultChan <- result:
		case <- ctx.Done():
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
	}
    e.result = result
	return nil
}

//重置
func (e *Event) Reset(){
	if e.ctxBg !=nil{
		e.ctxCancel()
	}

	e.ctxBg, e.ctxCancel = context.WithCancel(context.Background())
	e.waiters = nil
	e.result = NOT_USED
}
 
 

评论(114) 阅读(713)

rabbitmq辣眼睛的忙等

2021年5月28日 07:13

背景

 
用golang做了一个分布式结构的系统, 消息服务器用了rabbitmq。
 
整体是基于machinery+rabbitmq形式的分布式任务系统
 
在做压力测试时,发现rabbitmq的cpu使用率飙升
 
 

分析

 
* 官方给出的解释
 
scheduler采用了忙等,来提高消息传递性能
 
https://www.rabbitmq.com/runtime.html#cpu
 
其中也讲了一些方法来降低cpu使用率
 
* 官方建议
 
1. rabbtimq最好是装在单独的服务器,不与其它应用共用。
 
2. 也介绍了修改cpu使用个数的方法
 
* 对我而言
 
通信关cpu什么事,导致cpu飙升,就是不应该
 
* 站在传统语言java/python/...等语言角度
 
忙等好像也没啥
 
* 站在go的角度
 
忙等是上世纪的技术了。
 
看起来太辣眼睛。
 
* 从我的应用场景上来看
 
系统规模可大可小, 而且业务高峰时间很短
 
在系统规模小时,不大愿意单独给rabbitmq一台服务器
 
在业务高峰的这几分钟,正好会影响其它系统
 
* 是否有其它选择
 
nats
 
* 来源
 
 
[佛說大乘無量壽莊嚴清淨平等覺經pdf](http://doc.sxjy360.top/book/佛說大乘無量壽莊嚴清淨平等覺經(難字注音).pdf)
[净土大经科注2014-doc](http://doc.sxjy360.top/book/净土大经科注2014-doc.zip)
 

Tags: rabbitmq
评论(148) 阅读(851)

go实现Event事件context版

2021年5月22日 08:27

描述

 
上一篇文章,写了go如何实现Event事件,有几个可以优化的地方.
sync.Map保存结果可以不用这么复杂.
改用context关闭通道更加方便
 
 

事件实现

 
package common

import (
	"context"
	"errors"
	"go.uber.org/zap"
	"time"
)

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	//监听者
	waiters  []chan interface{}
	//结果
	result interface{}
	//上下文控制
	ctxBg 	context.Context
	ctxCancel context.CancelFunc
	//日志
	log *zap.SugaredLogger
}

//等待结果
func (e *Event) Wait(timeout time.Duration) interface{}{
	ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*timeout)
	defer cancel()

	//等待者
	resultChan := make(chan interface{}, 1)
	e.waiters = append(e.waiters, resultChan)

	//等待
	select{
	case result := <- resultChan:
		return result
	case <- ctx.Done():
	}
	return nil
}

//发送结果
func (e *Event) Send(result interface{}) error{
	//防止发送多次
	if e.result !=nil{
		return errors.New("Event is used")
	}

	e.result = result
	for _, resultChan := range e.waiters{
		select{
		case resultChan <- result:
		default:
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
	}
	return nil
}

//重置
func (e *Event) Reset(){
	if e.ctxBg !=nil{
		e.ctxCancel()
	}

	e.ctxBg, e.ctxCancel = context.WithCancel(context.Background())
	e.waiters = nil
	e.result = nil
}
 
 

评论(264) 阅读(1048)

go如何实现并发安全的优先级队列

2021年5月20日 19:17

 

描述

 
go的如何实现优先级队列?
 
优先级别队列是否并发安全?
 

我的需求场景

 
实现了一个通信协议,收发消息。发送通道会阻塞。如果拥堵时,要控制拥堵消息的优先顺序。
 
优先级用Priority加上时间, Priority小优先, 早到优先。
 
 

原理

 
1. 通过实现官方接口heap, 得到优先级队列
 
2. 使用用chan进行控制并发控制,达到并发安全的效果
 
 

优先队列实现

 
package queue

import (
	"xxxx/packets"
	"container/heap"
	"time"
)

//---------------------------------------
//		优先队列(提供并发安全接口)
//---------------------------------------

func NewPriorityQueue() *PriorityQueue{
	q := &PriorityQueue{
		isPushing: false,
		ch: make(chan *QueueItem, 1),
	}
	return q
}


type PriorityQueue struct{
	//队列(最小堆)
	items []*QueueItem	//堆
	ch chan *QueueItem	//同步控制
	isPushing bool	//推送状态
}

// 数据项
type QueueItem struct {
	Value    	*packets.Packet	// value
	Priority 	int    				// 优先级(越小越靠前)
	JoinTime    time.Time			// 加入时间
	index 		int  				// 索引
}

//---------------------------------------
//		heap接口
//---------------------------------------
func (pq PriorityQueue) Len() int { return len(pq.items) }

func (pq PriorityQueue) Less(i, j int) bool {
	// priority越小越靠前
	if pq.items[i].Priority != pq.items[j].Priority{
		return pq.items[i].Priority < pq.items[j].Priority
	}else{
		return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) <0
	}
}

func (pq PriorityQueue) Swap(i, j int) {
	pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
	pq.items[i].index = i
	pq.items[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(pq.items)
	item := x.(*QueueItem)
	item.index = n
	pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old.items)
	if n==0{
		return nil
	}
	item := old.items[n-1]
	old.items[n-1] = nil  // avoid memory leak
	item.index = -1 // for safety
	pq.items = old.items[0 : n-1]
	return item
}

// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.Packet, priority int) {
	item.Value = value
	item.Priority = priority
	heap.Fix(pq, item.index)
}

//---------------------------------------
//		同步接口
//---------------------------------------
func (pq *PriorityQueue) Get(timeout time.Duration) *QueueItem{
	timeout |= 30

	ticker := time.NewTicker(time.Second*timeout)
	defer ticker.Stop()

	select {
	case result := <- pq.ch:
		return result
	case <- ticker.C:
	}
	return nil
}

func (pq *PriorityQueue) PutNoWait(item *QueueItem){
	heap.Push(pq, item)
	//启用推送
	pq.startPushing()
}

//负责推送数据
func (pq *PriorityQueue) startPushing(){
	if pq.isPushing{
		return
	}
	go pq._startPushing()
}

func (pq *PriorityQueue) _startPushing(){
	for pq.Len()>0{
		pq.isPushing = true

		x := heap.Pop(pq)
		if x==nil{
			continue
		}
		item := x.(*QueueItem)
		pq.ch <- item
	}
	pq.isPushing = false
}
 
### 用法示例
 
//并发安全用法
func TestSendQueue(t *testing.T){
	q := NewPriorityQueue()
	q.PutNoWait(&QueueItem{
		Value: 	&packets.Packet{
						Header: &packets.SecsHeader{
						System: 1,
					},
					Data: []byte{},
				},
		Priority: 100,
		JoinTime: time.Now(),
	})
    item := q.Get(30)
    fmt.Println(item)
}
 

总结

 
上面已经满足我的需求场景了。 队列的删除、撤销,并不是我的应用场景,不在这里考虑,也不是队列该做的事。
 
 
 

评论(143) 阅读(766)

go实现Event事件

2021年5月16日 21:12

 

描述

 
go协程之间用通道通信,如何将它改造成事件形式。
 
 

需求描述

 
以下是我的需求
 
e := NewEvent()
//协程1等待结果, 10是等待时间
e.Wait(10)
//协程2发送结果
e.Send(xxxx)
 

原理

 
原理比较简单,不描述了
 

示例

package common

import (
	"go.uber.org/zap"
	"sync"
	"time"
)

var NOT_USED interface{} = struct{}{}

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	waiters  sync.Map
	log *zap.SugaredLogger
}

func (e *Event) Wait(timeout time.Duration) interface{}{
	ticker := time.NewTicker(time.Second*timeout)
	defer ticker.Stop()

	resultChan := make(chan interface{}, 1)
	e.waiters.Store(resultChan, true)

	select{
	case result := <- resultChan:
		if result == NOT_USED{
			return nil
		}
		return result
	case <- ticker.C:
		close(resultChan)
	}

	e.waiters.Delete(resultChan)
	return nil
}

func (e *Event) Send(result interface{}){
	e.waiters.Range(func(k, v interface{})bool{
		resultChan := k.(chan interface{})
		select{
		case resultChan <- result:
			e.waiters.Delete(resultChan)
		default:
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
		return true
	})
}

func (e *Event) Reset(){
	e.waiters.Range(func(k, v interface{})bool{
		resultChan := k.(chan interface{})
		resultChan <- NOT_USED
		e.waiters.Delete(resultChan)
		return true
	})
}
 
 
 

评论(111) 阅读(695)

golang如何正确关闭计时器

2021年5月16日 20:14

 

描述

 
* go的官方库计时器, 有一个陷阱,ticker.Stop()如果不注意会导致协程无法关闭。
* 如果我们想强制中断计时,该如何做?
 

示例:ticker.Stop()会导致协程无法结束

 
下面示例便于理解
 
func main(){
    ticker := time.NewTicker(time.Second*10)
    go func(){
        log.Println("go start")
        select{
        case <-ticker.C:
            log.Println("ticker stop")
        }
        //会发现这一句化一直不打印
        log.Println("go end")
    }()
    time.Sleep(1)
    //stop无法结束上面的协程
    ticker.Stop()
    time.Sleep(20)
    log.Println("main end")
}
 
 

使用标记值终止计时器

 
func WaitResponse(queueChan *chan Packet, timeout time.Duration) *Packet, error{
    ticker := time.NewTicker(time.Second*timeout)
    defer ticker.Stop()

	var response Packet

	var err error
	select{
	case response = <-*queueChan:
        if response == "标记值xxx"{
            return nil, errors.New("terminate response")
        }
	case <-ticker.C:
		err = errors.New("wait response timeout")
	}
	return &response, err
}

go WaitResponse(queueChan, time.Duration(60))

queueChan <- "标记值xxx"
 
 
 

 

评论(65) 阅读(606)