zap日志写入通道被覆盖

2021年8月17日 20:00

 

描述

 
实现了一个io.Writer接口,允许zap日志写入,传递给界面,出现了日志被覆盖、重复的情况。
 

原因

 
io.Writer的接口Write传入的[]byte真实类型是slice。通道传递slice时,用的是引用传递,所以真实数据有被覆盖的情况 
 

示例

 
* zap初始化
 
...
out = COut()
zapcore.NewCore(encoder, zapcore.AddSync(out), clevel),
...
 
* 自定义日志写入Channel
 
package logs

var _GlobalChanOutput = &ChanOutput{
outs: make([]chan *[]byte, 0),
}

/*---------------------------------------
全局函数
示例
logs.COut().AddOut(xx)
---------------------------------------*/

//全局ChanOutput
func COut() *ChanOutput{
return _GlobalChanOutput
}

/*---------------------------------------
log to channel
---------------------------------------*/

type ChanOutput struct{
outs []chan *[]byte
}

//io.Writer接口
func (c *ChanOutput) Write(p []byte) (n int, err error) {
count :=0

//数据复制(防止被覆盖)
_p := make([]byte, len(p))
copy(_p, p)

for _, out :=range c.outs{
select{
case out <- &_p:
count += 1
default:
}
}
return count, nil
}

func (c *ChanOutput) AddOut(out chan *[]byte) {
c.outs = append(c.outs, out)
}
 
* 读取日志
 
logOut := make(chan *[]byte, 1024)
logs.COut().AddOut(logOut)

pctx, pctxCancel := context.WithCancel(context.Background())
go func(ctx context.Context){
    select {
    case msg := <- logOut:
        fmt.Println(string(*msg))
    case <-ctx.Done():
        break
    }
}(pctx)
 
 

评论(29) 阅读(650)

msgpack代替json防止丢失类型

2021年6月30日 15:12

 

描述

 
开发了一个通信协议, 协议对数据类型比较敏感, 根据不同数据数据类型使用不同格式。golang的数据类型能够满足需求。
 
然而,将协议转为http方式时,如果传入json,会发现数据类型不符合预期
 
例如,本来是应该int结果是float64; 本来应该是[]byte,结果是string
 

环境

 
http服务器: gin
 
开发语言: golang
 

解决方法探寻

 
什么传输协议会带上数据类型?
 
* json类型丢失
 
默认情况下gin的BindJSON,可将传入的参数绑定到具体的结构体,产生类型转换。然后,我的结果需要是动态的,不知道参数的具体内容。
 
* 自定义参数格式,将类型带上。例如 marchinery
 
Arg struct{
    Type string
    Value interface{}
}
 
缺点:太麻烦
 
* 常用协议中选择
 
xml         可以用,不理想
 
yaml        不能解决问题
 
protobuf    不能解决问题
 
messagepack 有详细的数据类型,完美
 
 

msgpack替代json

 
gin示例
 
func SendXX(c *gin.Context){
    rawData, _ := c.GetRawData()
    var param interface{}
    msgpack.Unmarshal(rawData, ¶m)

result := struct{
Name string
}{
Name: "xxx",
}
resultData, _ := msgpack.Marshal(result)
c.Data(200, "application/x-msgpack", resultData)
}
 

备注

 
* gin自带的msgpack版本是1.1.7不好用,不会自动进行类型转换。 最好改用  github.com/vmihailenco/msgpack/v5
 
* 替换gin中的部分接口就可以,不用全部改为msgpack
 
 
 
 

评论(127) 阅读(1391)

人生枷锁之一:傲慢

2021年6月15日 06:08

 

问题

 
有时会生出轻视之心,觉得某某能力不行。确实它真的能力不行,为什么会产生这种情绪?
 
别人与我何干,应该很平淡才对。
 
这是什么?
 
在什么地方是否会出现?
 
对我有多大危害?
 
 

 这是什么?

 
傲慢
 
 

什么地方会出现?

 
1、熟人之间。熟悉了对方,才会有分别判断,才会产生某某不行的情绪。
 
2、能力、相貌、财富、权势、等等不平等的地方
 
 

实例

 
1、骂人、打人。傲慢是原因之一。
 
2、刻薄、不能容人。傲慢是原因。
 
3、怨恨,愤世嫉俗。有才能而不得志,傲慢是原因。
 
4、嫌弃、不如意。觉得周围人不如你,必然产生嫌弃感。
 
 

对我有多大危害?

 
人事环境决定个人的命运。而傲慢在破坏命运。
 
 

客观思惟

 
1、环境决定命运,而不是能力(xxx)。
 
2、能力(xxx)强又如何?能带给我们什么,没有人支持,没有平台,没有环境,什么也是不。
 
3、能力(xxx)只是果报,可以修得到,又不是永恒的。
 
4、别人不如你,代表着这个环境需要你。如果环境不需要你,排斥你,会怎么样?
 
5、人与人之间互助生存,此时正代表你需要承担责任。
 
 

应该如何做?

 
能力(xxx)应该发挥它的正面作用,而不是发挥负面作用。
 
 

总结

 
1、上天给了我们某方面的优势,是一把双刃剑,是福也是祸。
 
2、傲慢是潜意识,根深蒂固,很难被自己察觉到。
 
3、值得花大部分精力去对治它。
 
 

Tags: 体悟
评论(43) 阅读(592)

nats的安装docker方式

2021年6月12日 12:02

描述

    docker-compose方式启动nats-server。nats的镜像非常小, docker-cpomse进行管理。下面是配置方式
 

docker-compose.yaml配置 

 
version: "2"
services:
   nats-server:
       #image: provide/nats-server:latest
       image: nats:latest
       volumes:
           - ./nats-server.conf:/nats-server.conf
       ports:
           - 4222:4222
 

配置文件nats-server.conf

# Client port of 4222 on all interfaces
port: 4222

# HTTP monitoring port
monitor_port: 8222

# This is for clustering multiple servers together.
cluster {

  # Route connections to be received on any interface on port 6222
  port: 6222

  # Routes are protected, so need to use them with --routes flag
  # e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222
  authorization {
    user: ruser
    password: T0pS3cr3t
    timeout: 2
  }

  # Routes are actively solicited and connected to from this server.
  # This Docker image has none by default, but you can pass a
  # flag to the gnatsd docker image to create one to an existing server.
  routes = []
}
 

启动

 
docker-compose -f docker-compose.yaml up/restart/start/stop
 

Tags: nats
评论(100) 阅读(850)

go实现Event事件模型修正

2021年6月03日 22:39

 

描述

 
之前文章中实现了event事件模型,实际使用中发现了几个问题
 
 
1. 并发会出现情况send先执行, 此时还没有waiter。将wait拆分成addWaiter和wait()
 
2. 不能使用缓冲通道。会出现send完成,wait还未收到的情况。
 

用法修正

 
e.Reset()
waiter := e.AddWaiter()
waiter := e.Wait(waiter, xxx)
e.Send(xxx)
 

事件实现

package common

import (
	"context"
	"errors"
	"go.uber.org/zap"
	"time"
)

var NOT_USED = struct{}{}

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	//监听者
	waiters  []chan interface{}
	//结果
	result interface{}
	//上下文控制
	ctxBg 	context.Context
	ctxCancel context.CancelFunc
	//日志
	log *zap.SugaredLogger
}


func (e *Event) AddWaiter() *chan interface{}{
	//等待者
	resultChan := make(chan interface{}, 0)
	e.waiters = append(e.waiters, resultChan)
	return &resultChan
}

//等待结果
func (e *Event) Wait(waiter *chan interface{}, timeout time.Duration) (interface{}, error){
	if e.result ==NOT_USED{
		ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*timeout)
		defer cancel()

		//等待
		select{
		case result := <- *waiter:
			return result, nil
		case <- ctx.Done():
			if ctx.Err() == context.Canceled{
				return nil, nil
			}
			return nil, errors.New("event wait timeout")
		}
		return nil, nil
	}else{
		return e.result, nil
	}
}

//发送结果
func (e *Event) Send(result interface{}) error{
	//防止发送多次
	if e.result !=NOT_USED{
		return errors.New("Event is used")
	}

	ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*3)
	defer cancel()

	for _, resultChan := range e.waiters{
		select{
		case resultChan <- result:
		case <- ctx.Done():
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
	}
    e.result = result
	return nil
}

//重置
func (e *Event) Reset(){
	if e.ctxBg !=nil{
		e.ctxCancel()
	}

	e.ctxBg, e.ctxCancel = context.WithCancel(context.Background())
	e.waiters = nil
	e.result = NOT_USED
}
 
 

评论(127) 阅读(754)

rabbitmq辣眼睛的忙等

2021年5月28日 07:13

背景

 
用golang做了一个分布式结构的系统, 消息服务器用了rabbitmq。
 
整体是基于machinery+rabbitmq形式的分布式任务系统
 
在做压力测试时,发现rabbitmq的cpu使用率飙升
 
 

分析

 
* 官方给出的解释
 
scheduler采用了忙等,来提高消息传递性能
 
https://www.rabbitmq.com/runtime.html#cpu
 
其中也讲了一些方法来降低cpu使用率
 
* 官方建议
 
1. rabbtimq最好是装在单独的服务器,不与其它应用共用。
 
2. 也介绍了修改cpu使用个数的方法
 
* 对我而言
 
通信关cpu什么事,导致cpu飙升,就是不应该
 
* 站在传统语言java/python/...等语言角度
 
忙等好像也没啥
 
* 站在go的角度
 
忙等是上世纪的技术了。
 
看起来太辣眼睛。
 
* 从我的应用场景上来看
 
系统规模可大可小, 而且业务高峰时间很短
 
在系统规模小时,不大愿意单独给rabbitmq一台服务器
 
在业务高峰的这几分钟,正好会影响其它系统
 
* 是否有其它选择
 
nats
 
* 来源
 
 
[佛說大乘無量壽莊嚴清淨平等覺經pdf](http://doc.sxjy360.top/book/佛說大乘無量壽莊嚴清淨平等覺經(難字注音).pdf)
[净土大经科注2014-doc](http://doc.sxjy360.top/book/净土大经科注2014-doc.zip)
 

Tags: rabbitmq
评论(154) 阅读(866)

go实现Event事件context版

2021年5月22日 08:27

描述

 
上一篇文章,写了go如何实现Event事件,有几个可以优化的地方.
sync.Map保存结果可以不用这么复杂.
改用context关闭通道更加方便
 
 

事件实现

 
package common

import (
	"context"
	"errors"
	"go.uber.org/zap"
	"time"
)

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	//监听者
	waiters  []chan interface{}
	//结果
	result interface{}
	//上下文控制
	ctxBg 	context.Context
	ctxCancel context.CancelFunc
	//日志
	log *zap.SugaredLogger
}

//等待结果
func (e *Event) Wait(timeout time.Duration) interface{}{
	ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*timeout)
	defer cancel()

	//等待者
	resultChan := make(chan interface{}, 1)
	e.waiters = append(e.waiters, resultChan)

	//等待
	select{
	case result := <- resultChan:
		return result
	case <- ctx.Done():
	}
	return nil
}

//发送结果
func (e *Event) Send(result interface{}) error{
	//防止发送多次
	if e.result !=nil{
		return errors.New("Event is used")
	}

	e.result = result
	for _, resultChan := range e.waiters{
		select{
		case resultChan <- result:
		default:
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
	}
	return nil
}

//重置
func (e *Event) Reset(){
	if e.ctxBg !=nil{
		e.ctxCancel()
	}

	e.ctxBg, e.ctxCancel = context.WithCancel(context.Background())
	e.waiters = nil
	e.result = nil
}
 
 

评论(279) 阅读(1082)

go如何实现并发安全的优先级队列

2021年5月20日 19:17

 

描述

 
go的如何实现优先级队列?
 
优先级别队列是否并发安全?
 

我的需求场景

 
实现了一个通信协议,收发消息。发送通道会阻塞。如果拥堵时,要控制拥堵消息的优先顺序。
 
优先级用Priority加上时间, Priority小优先, 早到优先。
 
 

原理

 
1. 通过实现官方接口heap, 得到优先级队列
 
2. 使用用chan进行控制并发控制,达到并发安全的效果
 
 

优先队列实现

 
package queue

import (
	"xxxx/packets"
	"container/heap"
	"time"
)

//---------------------------------------
//		优先队列(提供并发安全接口)
//---------------------------------------

func NewPriorityQueue() *PriorityQueue{
	q := &PriorityQueue{
		isPushing: false,
		ch: make(chan *QueueItem, 1),
	}
	return q
}


type PriorityQueue struct{
	//队列(最小堆)
	items []*QueueItem	//堆
	ch chan *QueueItem	//同步控制
	isPushing bool	//推送状态
}

// 数据项
type QueueItem struct {
	Value    	*packets.Packet	// value
	Priority 	int    				// 优先级(越小越靠前)
	JoinTime    time.Time			// 加入时间
	index 		int  				// 索引
}

//---------------------------------------
//		heap接口
//---------------------------------------
func (pq PriorityQueue) Len() int { return len(pq.items) }

func (pq PriorityQueue) Less(i, j int) bool {
	// priority越小越靠前
	if pq.items[i].Priority != pq.items[j].Priority{
		return pq.items[i].Priority < pq.items[j].Priority
	}else{
		return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) <0
	}
}

func (pq PriorityQueue) Swap(i, j int) {
	pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
	pq.items[i].index = i
	pq.items[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(pq.items)
	item := x.(*QueueItem)
	item.index = n
	pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old.items)
	if n==0{
		return nil
	}
	item := old.items[n-1]
	old.items[n-1] = nil  // avoid memory leak
	item.index = -1 // for safety
	pq.items = old.items[0 : n-1]
	return item
}

// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.Packet, priority int) {
	item.Value = value
	item.Priority = priority
	heap.Fix(pq, item.index)
}

//---------------------------------------
//		同步接口
//---------------------------------------
func (pq *PriorityQueue) Get(timeout time.Duration) *QueueItem{
	timeout |= 30

	ticker := time.NewTicker(time.Second*timeout)
	defer ticker.Stop()

	select {
	case result := <- pq.ch:
		return result
	case <- ticker.C:
	}
	return nil
}

func (pq *PriorityQueue) PutNoWait(item *QueueItem){
	heap.Push(pq, item)
	//启用推送
	pq.startPushing()
}

//负责推送数据
func (pq *PriorityQueue) startPushing(){
	if pq.isPushing{
		return
	}
	go pq._startPushing()
}

func (pq *PriorityQueue) _startPushing(){
	for pq.Len()>0{
		pq.isPushing = true

		x := heap.Pop(pq)
		if x==nil{
			continue
		}
		item := x.(*QueueItem)
		pq.ch <- item
	}
	pq.isPushing = false
}
 
### 用法示例
 
//并发安全用法
func TestSendQueue(t *testing.T){
	q := NewPriorityQueue()
	q.PutNoWait(&QueueItem{
		Value: 	&packets.Packet{
						Header: &packets.SecsHeader{
						System: 1,
					},
					Data: []byte{},
				},
		Priority: 100,
		JoinTime: time.Now(),
	})
    item := q.Get(30)
    fmt.Println(item)
}
 

总结

 
上面已经满足我的需求场景了。 队列的删除、撤销,并不是我的应用场景,不在这里考虑,也不是队列该做的事。
 
 
 

评论(143) 阅读(770)

go实现Event事件

2021年5月16日 21:12

 

描述

 
go协程之间用通道通信,如何将它改造成事件形式。
 
 

需求描述

 
以下是我的需求
 
e := NewEvent()
//协程1等待结果, 10是等待时间
e.Wait(10)
//协程2发送结果
e.Send(xxxx)
 

原理

 
原理比较简单,不描述了
 

示例

package common

import (
	"go.uber.org/zap"
	"sync"
	"time"
)

var NOT_USED interface{} = struct{}{}

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	waiters  sync.Map
	log *zap.SugaredLogger
}

func (e *Event) Wait(timeout time.Duration) interface{}{
	ticker := time.NewTicker(time.Second*timeout)
	defer ticker.Stop()

	resultChan := make(chan interface{}, 1)
	e.waiters.Store(resultChan, true)

	select{
	case result := <- resultChan:
		if result == NOT_USED{
			return nil
		}
		return result
	case <- ticker.C:
		close(resultChan)
	}

	e.waiters.Delete(resultChan)
	return nil
}

func (e *Event) Send(result interface{}){
	e.waiters.Range(func(k, v interface{})bool{
		resultChan := k.(chan interface{})
		select{
		case resultChan <- result:
			e.waiters.Delete(resultChan)
		default:
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
		return true
	})
}

func (e *Event) Reset(){
	e.waiters.Range(func(k, v interface{})bool{
		resultChan := k.(chan interface{})
		resultChan <- NOT_USED
		e.waiters.Delete(resultChan)
		return true
	})
}
 
 
 

评论(119) 阅读(717)

golang如何正确关闭计时器

2021年5月16日 20:14

 

描述

 
* go的官方库计时器, 有一个陷阱,ticker.Stop()如果不注意会导致协程无法关闭。
* 如果我们想强制中断计时,该如何做?
 

示例:ticker.Stop()会导致协程无法结束

 
下面示例便于理解
 
func main(){
    ticker := time.NewTicker(time.Second*10)
    go func(){
        log.Println("go start")
        select{
        case <-ticker.C:
            log.Println("ticker stop")
        }
        //会发现这一句化一直不打印
        log.Println("go end")
    }()
    time.Sleep(1)
    //stop无法结束上面的协程
    ticker.Stop()
    time.Sleep(20)
    log.Println("main end")
}
 
 

使用标记值终止计时器

 
func WaitResponse(queueChan *chan Packet, timeout time.Duration) *Packet, error{
    ticker := time.NewTicker(time.Second*timeout)
    defer ticker.Stop()

	var response Packet

	var err error
	select{
	case response = <-*queueChan:
        if response == "标记值xxx"{
            return nil, errors.New("terminate response")
        }
	case <-ticker.C:
		err = errors.New("wait response timeout")
	}
	return &response, err
}

go WaitResponse(queueChan, time.Duration(60))

queueChan <- "标记值xxx"
 
 
 

 

评论(65) 阅读(606)