golang如何正确关闭计时器

2021年5月16日 20:14

 

描述

 
* go的官方库计时器, 有一个陷阱,ticker.Stop()如果不注意会导致协程无法关闭。
* 如果我们想强制中断计时,该如何做?
 

示例:ticker.Stop()会导致协程无法结束

 
下面示例便于理解
 
func main(){
    ticker := time.NewTicker(time.Second*10)
    go func(){
        log.Println("go start")
        select{
        case <-ticker.C:
            log.Println("ticker stop")
        }
        //会发现这一句化一直不打印
        log.Println("go end")
    }()
    time.Sleep(1)
    //stop无法结束上面的协程
    ticker.Stop()
    time.Sleep(20)
    log.Println("main end")
}
 
 

使用标记值终止计时器

 
func WaitResponse(queueChan *chan Packet, timeout time.Duration) *Packet, error{
    ticker := time.NewTicker(time.Second*timeout)
    defer ticker.Stop()

	var response Packet

	var err error
	select{
	case response = <-*queueChan:
        if response == "标记值xxx"{
            return nil, errors.New("terminate response")
        }
	case <-ticker.C:
		err = errors.New("wait response timeout")
	}
	return &response, err
}

go WaitResponse(queueChan, time.Duration(60))

queueChan <- "标记值xxx"
 
 
 

 

评论(0) 阅读(729)

golang用通道实现串口信号量

2021年5月15日 16:11

 

描述

 
串口通信,使用权该谁?需要采用信号量,进行控制。go如何实现?
 
 

原理

 
1. 缓冲通道的缓冲大小等于资源数
 
2. time计时器控制超时
 
 

示例

 
 
func NewPortAvailableSem()*PortAvailableSem{
	ps := &PortAvailableSem{
		PortChan: make(chan int, 1),
	}
	ps.PortChan <- 1
	return ps
}

type PortAvailableSem struct{
	PortChan chan int
}

func (ps *PortAvailableSem) Acquire(block bool, timeout int) bool{
	if block{
		ticker := time.NewTicker(time.Second*time.Duration(timeout))
		select{
		case  <- ps.PortChan:
			ticker.Stop()
			return true
		case <-ticker.C:
			return false
		}
	}else{
		select{
		case <- ps.PortChan:
			return true
		default:
			return false
		}
	}
	return false
}

func (ps *PortAvailableSem) Release(){
	if ps.Locked(){
		select{
			case ps.PortChan <- 1:
		}
	}
}

func (ps *PortAvailableSem) Locked() bool{
	return len(ps.PortChan) == 0
}
 
 

评论(0) 阅读(680)

golang超时等待

2021年5月08日 11:13

 描述

 
go中如何实现超时等待
 

python实现:队列超时等待

 
queue.get(block=True, timeout=45)
 
 

go实现:通道+time计时

 
select会阻塞两个case,直到其中一个返回。如果是response,则停止计时器; 否则,就返回错误
 
func _waitResponse(queueChan *chan int, timeout time.Duration) (int, error){
ticker := time.NewTicker(timeout)
var response int
var err error
select{
case response = <-*queueChan:
ticker.Stop()
case <-ticker.C:
err = errors.New("wait response timeout")
}
return response, err
}
 
 

评论(0) 阅读(714)

任务队列machinery用法

2021年5月01日 09:28

 

 描述

 
go实现的基于消息中间件的异步任务队列, 下面是学习笔记
 

使用概述

 
步骤1: 创建server,配置参数、注册task。(此处server只是个配置作用, 并不是单独的server进程)
 
步骤2: 启动worker
 
步骤3: 发送task
 
与celery的用法是完全一致的
 
 

创建server

 
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
Broker:          "amqp://guest:guest@localhost:5672/",
DefaultQueue:    "machinery_tasks",
ResultBackend:   "amqp://guest:guest@localhost:5672/",
ResultsExpireIn: 3600,  //任务有效期
AMQP: &config.AMQPConfig{
Exchange:      "machinery_exchange",
ExchangeType:  "direct",
BindingKey:    "machinery_task",
PrefetchCount: 3,   //限定消费能力
},
}

// Create server instance
broker := amqpbroker.New(cnf)
backend := amqpbackend.New(cnf)
lock := eagerlock.New()     //任务锁
server := machinery.NewServer(cnf, broker, backend, lock)

// Register tasks
tasks := map[string]interface{}{
"add":               exampletasks.Add,
"multiply":          exampletasks.Multiply,
"sum_ints":          exampletasks.SumInts,
"sum_floats":        exampletasks.SumFloats,
"concat":            exampletasks.Concat,
"split":             exampletasks.Split,
"panic_task":        exampletasks.PanicTask,
"long_running_task": exampletasks.LongRunningTask,
}

return server, server.RegisterTasks(tasks)
}
 

创建worker

 
创建worker, 之后就可以启动了
 
func worker() error {
    //消费者的标记
consumerTag := "machinery_worker"

server, err := startServer()
if err != nil {
return err
}

    //第二个参数并发数, 0表示不限制
worker := server.NewWorker(consumerTag, 0)

    //钩子函数
errorhandler := func(err error) {}
pretaskhandler := func(signature *tasks.Signature) {}
posttaskhandler := func(signature *tasks.Signature) {}

worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)
return worker.Launch()
}
 
启动结果
 
INFO: 2021/05/01 08:28:27 worker.go:58 Launching a worker with the following settings:
INFO: 2021/05/01 08:28:27 worker.go:59 - Broker: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:61 - DefaultQueue: machinery_tasks
INFO: 2021/05/01 08:28:27 worker.go:65 - ResultBackend: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:67 - AMQP: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:68   - Exchange: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:69   - ExchangeType: direct
INFO: 2021/05/01 08:28:27 worker.go:70   - BindingKey: machinery_task
INFO: 2021/05/01 08:28:27 worker.go:71   - PrefetchCount: 0
INFO: 2021/05/01 08:28:27 amqp.go:96 [*] Waiting for messages. To exit press CTRL+C
 

发送任务

 
server, _ := startServer()

signature := &tasks.Signature{
    Name: "add",
    Args: []tasks.Arg{
        {
            Type:  "int64",
            Value: 1,
        },
        {
            Type:  "int64",
            Value: 1,
        },
    },
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult.Get(time.Millisecond * 5))  //等待间隔,理论上是越小越好

//asyncResult.GetWithTimeout(time.Second*120, time.Millisecond * 5)   //第一个参数才是timeout
 

总结

 
以上就是machinery的基本用法,与celery基本一样, 更详细内容参考官方文档
 
 
 

评论(38) 阅读(1784)

golang如何实现静态变量的效果

2021年4月24日 20:46

 

说明

 
go的结构和方法接收者, 可以模拟类的概念, 然而go不支持静态变量
 
不能总是每次先创建一个结构体, 然后获取变量吧
 
该如何办?
 
 

如何创建"类"

 
先展示一下,平时我是如何创建"类"的
 
1.私有的结构体作为真实对象
 
2. 对外暴露方法,这个方法类似class可以作为参数传递,使用这个class就可以创建对象
 
3. BaseVar定义对外暴露的接口
 
4. 定义方法的interface,方便作为参数传递
 
type BaseVar interface {
    //对外暴露的接口
}

//方便传递
type VarDecriptor func(params ...interface{}) BaseVar

//等同于类
func VarBinary(params ...interface{}) BaseVar {
s := &varBinary{
FormatCode: 0o10,
TextCode: "B",
PreferredTypes: []reflect.Kind{reflect.Uint8, reflect.Slice},
}
return s
}

//实质上的对象
type varBinary struct{
BaseVar   //外部接口
FormatCode int
TextCode string
PreferredTypes  []reflect.Kind //bytes bytearray
value           []byte
}
 

静态变量有何意义

 
关联常量与类
 

解决方法

 
定义全局变量,以"类"名作为key,存放静态变量。runtime能动态获取"类"名,通过"类"名取值。
 
var gFormatCode = map[string]int{
"VarBinary": 1,
}

func FormatCode(f VarDecriptor) int {
    //取方法名
fname := runtime.FuncForPC(reflect.ValueOf(fc).Pointer()).Name()
names := strings.Split(fname, ".")
funcName := names[1]
    //从全局变量中取值
return gFormatCode[funcName]
}

//使用时通过FormatCode,传入上面的VarBinary,就可以获取常量
 

评论(54) 阅读(2254)

golang网络字节与基本类型转换

2021年4月15日 21:35

 

说明

 
网络通信需要将go基本数据类型转为字节. go如何做?
 

基本类型

c类型 go类型 字节长度
signed char int8     1
unsigned char  uint8    1
_Bool            bool     1
short            int16    2
unsigned short uint16   2
int              int32    4
unsigned int uint32   4
float            float32  4
long long int64    8
unsigned l long uint64   8
double           float64 8
 

有符号与无符号转换

 
* int8/uint->byte或 byte->int8/uint8
1个字节强制类型转换会超范围
 
// int8 ->byte
var a int8 = -1
byte(a)          // 正常 255

//byte->int8
int8(byte(255))  //异常 constant 255 overflows int8

// byte->int8
var a byte = 255
int8(a)         //正常 -1
 

通用方法Write/Read

 
* int8/uint8/int16/uint16/int32/uint32/int64/uint64/float32/float64->[]byte
 
var a int16 =1
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, &a)
fmt.Println(buf.Bytes())
 
* []byte ->int8/uint8/int16/uint16/int32/uint32/int64/uint64/float32/float64
 
b :=[]byte{64, 9, 30, 184, 81, 235, 133, 31}
var a float64
binary.Read(bytes.NewBuffer(b), binary.BigEndian, &a)
fmt.Println(a)
 

binary.BigEndian方法

import (
	"bytes"
	"encoding/binary"
	"reflect"
)

//=================================
//		数字-->字节
//=================================
func Number2Bytes(value interface{}) []byte{
	result := make([]byte, 0)

	switch v := value.(type){
	case int8:
		return append(result, uint8(v))
	case int16:
		binary.BigEndian.PutUint16(result, uint16(v))
		return result
	case int32:
		binary.BigEndian.PutUint32(result, uint32(v))
		return result
	case int64:
		binary.BigEndian.PutUint64(result, uint64(v))
		return result
	case uint8:
		return append(result, uint8(v))
	case uint16:
		binary.BigEndian.PutUint16(result, v)
		return result
	case uint32:
		binary.BigEndian.PutUint32(result, v)
		return result
	case uint64:
		binary.BigEndian.PutUint64(result, v)
		return result
	case float32:
		buf := new(bytes.Buffer)
		binary.Write(buf, binary.BigEndian, &v)
		return buf.Bytes()
	case float64:
		buf := new(bytes.Buffer)
		binary.Write(buf, binary.BigEndian, &v)
		return buf.Bytes()
	}
	return nil
}

//=================================
//		字节-->数字
//=================================
func Bytes2Number(data []byte, kind reflect.Kind) interface{}{
	switch kind{
	case reflect.Int8:
		return int8(data[0])
	case reflect.Int16:
		return int16(binary.BigEndian.Uint16(data))
	case reflect.Int32:
		return int32(binary.BigEndian.Uint32(data))
	case reflect.Int64:
		return int64(binary.BigEndian.Uint64(data))
	case reflect.Uint8:
		return data[0]
	case reflect.Uint16:
		return binary.BigEndian.Uint16(data)
	case reflect.Uint32:
		return binary.BigEndian.Uint32(data)
	case reflect.Uint64:
		return binary.BigEndian.Uint64(data)
	case reflect.Float32:
		var v float32
		buf := bytes.NewBuffer(data)
		binary.Read(buf, binary.BigEndian, &v)
		return v
	case reflect.Float64:
		var v float64
		buf := bytes.NewBuffer(data)
		binary.Read(buf, binary.BigEndian, &v)
		return v
	}
	return nil
}
 
 
 

评论(1) 阅读(1286)

golang的继承不是继承

2021年4月12日 19:46

 

问题

 
struct嵌套,内层struct方法访问同名的属性,这个属性是谁的?
 

示例

 
package main

import (
"fmt"
)

type ProductA struct{
Name string
}

func (p *ProductA) PrintName(){
fmt.Println("a:", p.Name)
}

type ProductB struct{
ProductA
Name string
Level string
}

func main() {
    b := ProductB{
Name: "name-b",
Level: "level",
}
b.PrintName()
}
// 仍然是ProductA的
//打印 a:
 

总结

 
1. go没有继承,只有组合
2. 只是提供了类似继承的便捷访问方式,不要被所谓的”继承“误导
 

评论(14) 阅读(1910)

go的反射有什么不同

2021年4月10日 11:09

说明

 
go的反射有什么不同, 与动态语言python有什么不同
 

go反射的不同

 
1. 首先, go是静态强类型。再怎么反射它也是静态语言,不支持动态获取类型,例如,通过字符串"struct_name",创建struct_name对象
 
2. go的反射,在于通过对象获取类型信息。例如,通过object,得到Type,然后获取Type的属性
 
3. reflect的入口是TypeOf和ValueOf。一切的前提是先有对象
 
 
 

评论(0) 阅读(760)

gorm模型定义原理借鉴分析

2021年4月10日 10:26

说明

 
python有元类概念,在定义db模型时,相当方便,极大简化代码
 
go中没有元类概念, gorm有模型定义,看看它怎么实现,能否借鉴
 

gorm原理

 
1. gorm运用了结构体标签,通过reflect获取标签内容,这是基本原理,这里不做介绍。
 
2. 关注它如何运用这些特性,借鉴使用
 
3. 直接分析源码太复杂,绕的路径太多。基于gorm模型定义最核心的代码,写一个小例子,展示gorm的用法,这也是我需要借鉴的地方
 
4. gorm所有的接口db.Create, db.Model,...最终都是调用schema.Parse
 
 
package main

import (
"fmt"
"gorm.io/gorm"
"gorm.io/gorm/schema"
"reflect"
"sync"
)

type Product struct {
gorm.Model
Code  string
Price uint
}

func main() {
product, _ := schema.Parse(&Product{}, &sync.Map{}, schema.NamingStrategy{})
fmt.Println(product)
fmt.Println(reflect.TypeOf(product))
}

//打印
//main.Product
//*schema.Schema
 

总结

 
1. 自定义的model,最终都被gorm转为Schema类型
 
2. model仅仅提供字段、标签信息
 
3. gorm并且实现了Schema的String() string方法,让打印看起来是Product
 
4. gorm的模型定义,比较encoding/json简单,比较适合借鉴使用
 
 
 

Tags: gorm
评论(1559) 阅读(8209)

nsq双机集群部署

2021年4月02日 13:27

问题

 
双机nsq如果部署集群,如何保证高可用性
 
1. 单节点nsqlookup故障?
 
2. 单节点nsq故障?
 
3. 消息丢失?
 

环境说明

 
两台机器
 
机器A 192.168.120.1
 
机器B 192.168.120.101
 

部署过程

 
部署方法
 
1. 机器A部署一套nsqlookup+nsqd
 
2. 机器B部署一套nsqlookup+nsqd
 
3. 生产者将消息同时写入两个nsqd
 
3. 消费者监听两个nsqlookup
 
结构图
 
 
机器A
 
nsqlookupd -broadcast-address 192.168.120.1
nsqd -lookupd-tcp-address=192.168.120.1:4160 -lookupd-tcp-address=192.168.120.101:4160 -broadcast-address 192.168.120.1
 
机器B
 
nsqlookupd -broadcast-address 192.168.120.101
nsqd -lookupd-tcp-address=192.168.120.1:4160 -lookupd-tcp-address=192.168.120.101:4160 -broadcast-address 192.168.120.101
 

pynsq测试脚本

 
生产者
 
import nsq
import tornado.ioloop
import time

def pub_message():
    writer.pub('test', str(time.strftime('%H:%M:%S')).encode("utf-8"), finish_pub)

def finish_pub(conn, data):
    print(data)

# 写入两个nsq好处: 1.防止nsqd单点故障  2.防止消息丢失
writer = nsq.Writer(['192.168.120.101:4150', '192.168.120.1:4150'])

tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
nsq.run()
 
消费者
 
import nsq


def handler(message):
    print(message, message.id, message.timestamp, message.attempts, message.body)
    return True

# 防止nsqlookup故障
r = nsq.Reader(message_handler=handler,
               lookupd_http_addresses=['http://192.168.120.1:4161', 'http://192.168.120.101:4161'],
               topic='test', channel='abc', lookupd_poll_interval=15)

nsq.run()
 

总结

 
只要有一个nsqlookup和一个nsqd存活,系统就不会挂
 
 
 

Tags: nsq
评论(5) 阅读(1327)