go实现Event事件context版

2021年5月22日 08:27

描述

 
上一篇文章,写了go如何实现Event事件,有几个可以优化的地方.
sync.Map保存结果可以不用这么复杂.
改用context关闭通道更加方便
 
 

事件实现

 
package common

import (
	"context"
	"errors"
	"go.uber.org/zap"
	"time"
)

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	//监听者
	waiters  []chan interface{}
	//结果
	result interface{}
	//上下文控制
	ctxBg 	context.Context
	ctxCancel context.CancelFunc
	//日志
	log *zap.SugaredLogger
}

//等待结果
func (e *Event) Wait(timeout time.Duration) interface{}{
	ctx, cancel := context.WithTimeout(e.ctxBg, time.Second*timeout)
	defer cancel()

	//等待者
	resultChan := make(chan interface{}, 1)
	e.waiters = append(e.waiters, resultChan)

	//等待
	select{
	case result := <- resultChan:
		return result
	case <- ctx.Done():
	}
	return nil
}

//发送结果
func (e *Event) Send(result interface{}) error{
	//防止发送多次
	if e.result !=nil{
		return errors.New("Event is used")
	}

	e.result = result
	for _, resultChan := range e.waiters{
		select{
		case resultChan <- result:
		default:
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
	}
	return nil
}

//重置
func (e *Event) Reset(){
	if e.ctxBg !=nil{
		e.ctxCancel()
	}

	e.ctxBg, e.ctxCancel = context.WithCancel(context.Background())
	e.waiters = nil
	e.result = nil
}
 
 

评论(161) 阅读(755)

go如何实现并发安全的优先级队列

2021年5月20日 19:17

 

描述

 
go的如何实现优先级队列?
 
优先级别队列是否并发安全?
 

我的需求场景

 
实现了一个通信协议,收发消息。发送通道会阻塞。如果拥堵时,要控制拥堵消息的优先顺序。
 
优先级用Priority加上时间, Priority小优先, 早到优先。
 
 

原理

 
1. 通过实现官方接口heap, 得到优先级队列
 
2. 使用用chan进行控制并发控制,达到并发安全的效果
 
 

优先队列实现

 
package queue

import (
	"xxxx/packets"
	"container/heap"
	"time"
)

//---------------------------------------
//		优先队列(提供并发安全接口)
//---------------------------------------

func NewPriorityQueue() *PriorityQueue{
	q := &PriorityQueue{
		isPushing: false,
		ch: make(chan *QueueItem, 1),
	}
	return q
}


type PriorityQueue struct{
	//队列(最小堆)
	items []*QueueItem	//堆
	ch chan *QueueItem	//同步控制
	isPushing bool	//推送状态
}

// 数据项
type QueueItem struct {
	Value    	*packets.Packet	// value
	Priority 	int    				// 优先级(越小越靠前)
	JoinTime    time.Time			// 加入时间
	index 		int  				// 索引
}

//---------------------------------------
//		heap接口
//---------------------------------------
func (pq PriorityQueue) Len() int { return len(pq.items) }

func (pq PriorityQueue) Less(i, j int) bool {
	// priority越小越靠前
	if pq.items[i].Priority != pq.items[j].Priority{
		return pq.items[i].Priority < pq.items[j].Priority
	}else{
		return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) <0
	}
}

func (pq PriorityQueue) Swap(i, j int) {
	pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
	pq.items[i].index = i
	pq.items[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(pq.items)
	item := x.(*QueueItem)
	item.index = n
	pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old.items)
	if n==0{
		return nil
	}
	item := old.items[n-1]
	old.items[n-1] = nil  // avoid memory leak
	item.index = -1 // for safety
	pq.items = old.items[0 : n-1]
	return item
}

// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.Packet, priority int) {
	item.Value = value
	item.Priority = priority
	heap.Fix(pq, item.index)
}

//---------------------------------------
//		同步接口
//---------------------------------------
func (pq *PriorityQueue) Get(timeout time.Duration) *QueueItem{
	timeout |= 30

	ticker := time.NewTicker(time.Second*timeout)
	defer ticker.Stop()

	select {
	case result := <- pq.ch:
		return result
	case <- ticker.C:
	}
	return nil
}

func (pq *PriorityQueue) PutNoWait(item *QueueItem){
	heap.Push(pq, item)
	//启用推送
	pq.startPushing()
}

//负责推送数据
func (pq *PriorityQueue) startPushing(){
	if pq.isPushing{
		return
	}
	go pq._startPushing()
}

func (pq *PriorityQueue) _startPushing(){
	for pq.Len()>0{
		pq.isPushing = true

		x := heap.Pop(pq)
		if x==nil{
			continue
		}
		item := x.(*QueueItem)
		pq.ch <- item
	}
	pq.isPushing = false
}
 
### 用法示例
 
//并发安全用法
func TestSendQueue(t *testing.T){
	q := NewPriorityQueue()
	q.PutNoWait(&QueueItem{
		Value: 	&packets.Packet{
						Header: &packets.SecsHeader{
						System: 1,
					},
					Data: []byte{},
				},
		Priority: 100,
		JoinTime: time.Now(),
	})
    item := q.Get(30)
    fmt.Println(item)
}
 

总结

 
上面已经满足我的需求场景了。 队列的删除、撤销,并不是我的应用场景,不在这里考虑,也不是队列该做的事。
 
 
 

评论(125) 阅读(699)

go实现Event事件

2021年5月16日 21:12

 

描述

 
go协程之间用通道通信,如何将它改造成事件形式。
 
 

需求描述

 
以下是我的需求
 
e := NewEvent()
//协程1等待结果, 10是等待时间
e.Wait(10)
//协程2发送结果
e.Send(xxxx)
 

原理

 
原理比较简单,不描述了
 

示例

package common

import (
	"go.uber.org/zap"
	"sync"
	"time"
)

var NOT_USED interface{} = struct{}{}

func NewEvent() *Event {
	e := &Event{
		log: zap.S(),
	}
	e.Reset()
	return e
}

type Event struct{
	waiters  sync.Map
	log *zap.SugaredLogger
}

func (e *Event) Wait(timeout time.Duration) interface{}{
	ticker := time.NewTicker(time.Second*timeout)
	defer ticker.Stop()

	resultChan := make(chan interface{}, 1)
	e.waiters.Store(resultChan, true)

	select{
	case result := <- resultChan:
		if result == NOT_USED{
			return nil
		}
		return result
	case <- ticker.C:
		close(resultChan)
	}

	e.waiters.Delete(resultChan)
	return nil
}

func (e *Event) Send(result interface{}){
	e.waiters.Range(func(k, v interface{})bool{
		resultChan := k.(chan interface{})
		select{
		case resultChan <- result:
			e.waiters.Delete(resultChan)
		default:
			e.log.Warnf("Event.Send %p resultChan=%d, result=%v", e, len(resultChan), result)
		}
		return true
	})
}

func (e *Event) Reset(){
	e.waiters.Range(func(k, v interface{})bool{
		resultChan := k.(chan interface{})
		resultChan <- NOT_USED
		e.waiters.Delete(resultChan)
		return true
	})
}
 
 
 

评论(77) 阅读(570)

golang如何正确关闭计时器

2021年5月16日 20:14

 

描述

 
* go的官方库计时器, 有一个陷阱,ticker.Stop()如果不注意会导致协程无法关闭。
* 如果我们想强制中断计时,该如何做?
 

示例:ticker.Stop()会导致协程无法结束

 
下面示例便于理解
 
func main(){
    ticker := time.NewTicker(time.Second*10)
    go func(){
        log.Println("go start")
        select{
        case <-ticker.C:
            log.Println("ticker stop")
        }
        //会发现这一句化一直不打印
        log.Println("go end")
    }()
    time.Sleep(1)
    //stop无法结束上面的协程
    ticker.Stop()
    time.Sleep(20)
    log.Println("main end")
}
 
 

使用标记值终止计时器

 
func WaitResponse(queueChan *chan Packet, timeout time.Duration) *Packet, error{
    ticker := time.NewTicker(time.Second*timeout)
    defer ticker.Stop()

	var response Packet

	var err error
	select{
	case response = <-*queueChan:
        if response == "标记值xxx"{
            return nil, errors.New("terminate response")
        }
	case <-ticker.C:
		err = errors.New("wait response timeout")
	}
	return &response, err
}

go WaitResponse(queueChan, time.Duration(60))

queueChan <- "标记值xxx"
 
 
 

 

评论(50) 阅读(528)

golang用通道实现串口信号量

2021年5月15日 16:11

 

描述

 
串口通信,使用权该谁?需要采用信号量,进行控制。go如何实现?
 
 

原理

 
1. 缓冲通道的缓冲大小等于资源数
 
2. time计时器控制超时
 
 

示例

 
 
func NewPortAvailableSem()*PortAvailableSem{
	ps := &PortAvailableSem{
		PortChan: make(chan int, 1),
	}
	ps.PortChan <- 1
	return ps
}

type PortAvailableSem struct{
	PortChan chan int
}

func (ps *PortAvailableSem) Acquire(block bool, timeout int) bool{
	if block{
		ticker := time.NewTicker(time.Second*time.Duration(timeout))
		select{
		case  <- ps.PortChan:
			ticker.Stop()
			return true
		case <-ticker.C:
			return false
		}
	}else{
		select{
		case <- ps.PortChan:
			return true
		default:
			return false
		}
	}
	return false
}

func (ps *PortAvailableSem) Release(){
	if ps.Locked(){
		select{
			case ps.PortChan <- 1:
		}
	}
}

func (ps *PortAvailableSem) Locked() bool{
	return len(ps.PortChan) == 0
}
 
 

评论(47) 阅读(480)

golang超时等待

2021年5月08日 11:13

 描述

 
go中如何实现超时等待
 

python实现:队列超时等待

 
queue.get(block=True, timeout=45)
 
 

go实现:通道+time计时

 
select会阻塞两个case,直到其中一个返回。如果是response,则停止计时器; 否则,就返回错误
 
func _waitResponse(queueChan *chan int, timeout time.Duration) (int, error){
ticker := time.NewTicker(timeout)
var response int
var err error
select{
case response = <-*queueChan:
ticker.Stop()
case <-ticker.C:
err = errors.New("wait response timeout")
}
return response, err
}
 
 

评论(35) 阅读(513)

任务队列machinery用法

2021年5月01日 09:28

 

 描述

 
go实现的基于消息中间件的异步任务队列, 下面是学习笔记
 

使用概述

 
步骤1: 创建server,配置参数、注册task。(此处server只是个配置作用, 并不是单独的server进程)
 
步骤2: 启动worker
 
步骤3: 发送task
 
与celery的用法是完全一致的
 
 

创建server

 
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
Broker:          "amqp://guest:guest@localhost:5672/",
DefaultQueue:    "machinery_tasks",
ResultBackend:   "amqp://guest:guest@localhost:5672/",
ResultsExpireIn: 3600,  //任务有效期
AMQP: &config.AMQPConfig{
Exchange:      "machinery_exchange",
ExchangeType:  "direct",
BindingKey:    "machinery_task",
PrefetchCount: 3,   //限定消费能力
},
}

// Create server instance
broker := amqpbroker.New(cnf)
backend := amqpbackend.New(cnf)
lock := eagerlock.New()     //任务锁
server := machinery.NewServer(cnf, broker, backend, lock)

// Register tasks
tasks := map[string]interface{}{
"add":               exampletasks.Add,
"multiply":          exampletasks.Multiply,
"sum_ints":          exampletasks.SumInts,
"sum_floats":        exampletasks.SumFloats,
"concat":            exampletasks.Concat,
"split":             exampletasks.Split,
"panic_task":        exampletasks.PanicTask,
"long_running_task": exampletasks.LongRunningTask,
}

return server, server.RegisterTasks(tasks)
}
 

创建worker

 
创建worker, 之后就可以启动了
 
func worker() error {
    //消费者的标记
consumerTag := "machinery_worker"

server, err := startServer()
if err != nil {
return err
}

    //第二个参数并发数, 0表示不限制
worker := server.NewWorker(consumerTag, 0)

    //钩子函数
errorhandler := func(err error) {}
pretaskhandler := func(signature *tasks.Signature) {}
posttaskhandler := func(signature *tasks.Signature) {}

worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)
return worker.Launch()
}
 
启动结果
 
INFO: 2021/05/01 08:28:27 worker.go:58 Launching a worker with the following settings:
INFO: 2021/05/01 08:28:27 worker.go:59 - Broker: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:61 - DefaultQueue: machinery_tasks
INFO: 2021/05/01 08:28:27 worker.go:65 - ResultBackend: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:67 - AMQP: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:68   - Exchange: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:69   - ExchangeType: direct
INFO: 2021/05/01 08:28:27 worker.go:70   - BindingKey: machinery_task
INFO: 2021/05/01 08:28:27 worker.go:71   - PrefetchCount: 0
INFO: 2021/05/01 08:28:27 amqp.go:96 [*] Waiting for messages. To exit press CTRL+C
 

发送任务

 
server, _ := startServer()

signature := &tasks.Signature{
    Name: "add",
    Args: []tasks.Arg{
        {
            Type:  "int64",
            Value: 1,
        },
        {
            Type:  "int64",
            Value: 1,
        },
    },
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult.Get(time.Millisecond * 5))  //等待间隔,理论上是越小越好

//asyncResult.GetWithTimeout(time.Second*120, time.Millisecond * 5)   //第一个参数才是timeout
 

总结

 
以上就是machinery的基本用法,与celery基本一样, 更详细内容参考官方文档
 
 
 

评论(24) 阅读(634)

golang如何实现静态变量的效果

2021年4月24日 20:46

 

说明

 
go的结构和方法接收者, 可以模拟类的概念, 然而go不支持静态变量
 
不能总是每次先创建一个结构体, 然后获取变量吧
 
该如何办?
 
 

如何创建"类"

 
先展示一下,平时我是如何创建"类"的
 
1.私有的结构体作为真实对象
 
2. 对外暴露方法,这个方法类似class可以作为参数传递,使用这个class就可以创建对象
 
3. BaseVar定义对外暴露的接口
 
4. 定义方法的interface,方便作为参数传递
 
type BaseVar interface {
    //对外暴露的接口
}

//方便传递
type VarDecriptor func(params ...interface{}) BaseVar

//等同于类
func VarBinary(params ...interface{}) BaseVar {
s := &varBinary{
FormatCode: 0o10,
TextCode: "B",
PreferredTypes: []reflect.Kind{reflect.Uint8, reflect.Slice},
}
return s
}

//实质上的对象
type varBinary struct{
BaseVar   //外部接口
FormatCode int
TextCode string
PreferredTypes  []reflect.Kind //bytes bytearray
value           []byte
}
 

静态变量有何意义

 
关联常量与类
 

解决方法

 
定义全局变量,以"类"名作为key,存放静态变量。runtime能动态获取"类"名,通过"类"名取值。
 
var gFormatCode = map[string]int{
"VarBinary": 1,
}

func FormatCode(f VarDecriptor) int {
    //取方法名
fname := runtime.FuncForPC(reflect.ValueOf(fc).Pointer()).Name()
names := strings.Split(fname, ".")
funcName := names[1]
    //从全局变量中取值
return gFormatCode[funcName]
}

//使用时通过FormatCode,传入上面的VarBinary,就可以获取常量
 

评论(122) 阅读(1528)

golang网络字节与基本类型转换

2021年4月15日 21:35

 

说明

 
网络通信需要将go基本数据类型转为字节. go如何做?
 

基本类型

c类型 go类型 字节长度
signed char int8     1
unsigned char  uint8    1
_Bool            bool     1
short            int16    2
unsigned short uint16   2
int              int32    4
unsigned int uint32   4
float            float32  4
long long int64    8
unsigned l long uint64   8
double           float64 8
 

有符号与无符号转换

 
* int8/uint->byte或 byte->int8/uint8
1个字节强制类型转换会超范围
 
// int8 ->byte
var a int8 = -1
byte(a)          // 正常 255

//byte->int8
int8(byte(255))  //异常 constant 255 overflows int8

// byte->int8
var a byte = 255
int8(a)         //正常 -1
 

通用方法Write/Read

 
* int8/uint8/int16/uint16/int32/uint32/int64/uint64/float32/float64->[]byte
 
var a int16 =1
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, &a)
fmt.Println(buf.Bytes())
 
* []byte ->int8/uint8/int16/uint16/int32/uint32/int64/uint64/float32/float64
 
b :=[]byte{64, 9, 30, 184, 81, 235, 133, 31}
var a float64
binary.Read(bytes.NewBuffer(b), binary.BigEndian, &a)
fmt.Println(a)
 

binary.BigEndian方法

import (
	"bytes"
	"encoding/binary"
	"reflect"
)

//=================================
//		数字-->字节
//=================================
func Number2Bytes(value interface{}) []byte{
	result := make([]byte, 0)

	switch v := value.(type){
	case int8:
		return append(result, uint8(v))
	case int16:
		binary.BigEndian.PutUint16(result, uint16(v))
		return result
	case int32:
		binary.BigEndian.PutUint32(result, uint32(v))
		return result
	case int64:
		binary.BigEndian.PutUint64(result, uint64(v))
		return result
	case uint8:
		return append(result, uint8(v))
	case uint16:
		binary.BigEndian.PutUint16(result, v)
		return result
	case uint32:
		binary.BigEndian.PutUint32(result, v)
		return result
	case uint64:
		binary.BigEndian.PutUint64(result, v)
		return result
	case float32:
		buf := new(bytes.Buffer)
		binary.Write(buf, binary.BigEndian, &v)
		return buf.Bytes()
	case float64:
		buf := new(bytes.Buffer)
		binary.Write(buf, binary.BigEndian, &v)
		return buf.Bytes()
	}
	return nil
}

//=================================
//		字节-->数字
//=================================
func Bytes2Number(data []byte, kind reflect.Kind) interface{}{
	switch kind{
	case reflect.Int8:
		return int8(data[0])
	case reflect.Int16:
		return int16(binary.BigEndian.Uint16(data))
	case reflect.Int32:
		return int32(binary.BigEndian.Uint32(data))
	case reflect.Int64:
		return int64(binary.BigEndian.Uint64(data))
	case reflect.Uint8:
		return data[0]
	case reflect.Uint16:
		return binary.BigEndian.Uint16(data)
	case reflect.Uint32:
		return binary.BigEndian.Uint32(data)
	case reflect.Uint64:
		return binary.BigEndian.Uint64(data)
	case reflect.Float32:
		var v float32
		buf := bytes.NewBuffer(data)
		binary.Read(buf, binary.BigEndian, &v)
		return v
	case reflect.Float64:
		var v float64
		buf := bytes.NewBuffer(data)
		binary.Read(buf, binary.BigEndian, &v)
		return v
	}
	return nil
}
 
 
 

评论(52) 阅读(1092)

golang的继承不是继承

2021年4月12日 19:46

 

问题

 
struct嵌套,内层struct方法访问同名的属性,这个属性是谁的?
 

示例

 
package main

import (
"fmt"
)

type ProductA struct{
Name string
}

func (p *ProductA) PrintName(){
fmt.Println("a:", p.Name)
}

type ProductB struct{
ProductA
Name string
Level string
}

func main() {
    b := ProductB{
Name: "name-b",
Level: "level",
}
b.PrintName()
}
// 仍然是ProductA的
//打印 a:
 

总结

 
1. go没有继承,只有组合
2. 只是提供了类似继承的便捷访问方式,不要被所谓的”继承“误导
 

评论(206) 阅读(1409)