celery5.2.1以下版本任务调用多耗费1秒

2021年12月28日 10:14

问题

 
    5.0.5版本celery存在一个缺陷, 调用任务耗时多用1秒
 
    这个问题在5.2.0, 5.1.2同样存在
 
    没理解为什么到5.2.1版本才解决,甚至一度把celery排除python技术栈
 

环境描述

 
 
python3.6
celery5.0.5
windows 32位
 
 

调用celery任务代码摘要

 
# 1.send_task返回AsyncResult
# 2.AsyncResult的get()等待返回结果
# 3.get()会比真实多耗费1秒,并且每次请求都会出现

from celery import Celery
class xxxCelery(Celery):
    def call_xxx(self, name, timeout=120, **kwargs):
        LOG.info("send_task: %s" % locals())
        start = time.time()
        r = self.send_task(name, **kwargs)
        g = eventlet.spawn(r.get, timeout=timeout)
        result = g.wait()
        print("cost: %s" % time.time()-start)
        return
 

两个版本比对

 
分析celery源码之后,可以知道问题在drain_events()内部, 比较5.2.0和5.2.1版本
 
#celery/backends/asynchronuse.py

class greenletDrainer(Drainer):
    ...
    def run(self):
        self._started.set()
        while not self._stopped.is_set():
            try:
                self.result_consumer.drain_events(timeout=1)
                # 新增了两句, 估摸着是这个问题
                self._send_drain_complete_event()
                self._create_drain_complete_event()
            except socket.timeout:
                pass
        self._shutdown.set()
 
 

解决办法

 
    celery升级到5.2.1
 
    python要升级到3.7以上版本(celery要求python3.7以上版本)
 

解决效果

 
    耗时从1000多ms变成了30ms
 

Tags: celery
评论(75) 阅读(2486)

openssh8.6默认不支持公钥ssh

2021年12月02日 21:51

 

 起因

 
    最近升级了自己的linux后, 更新gogs代码时,发现用不了了
 
    反复测试原因,发现是最新版本openssh8.6关闭了公钥登录
 
 

错误提示

 
尝试了各种办法,会发现始终提示ssh登录失败
 
username@ip: Permission denied (publickey).
 

查看ssh详细信息

 
输出ssh调试信息,查看失败的具体原因
 
ssh -Tv username@ip

#发现了一条提示,这就是登录失败的原因
...
debug1: Authentications that can continue: publickey
debug1: Next authentication method: publickey
debug1: Offering public key: /home/xxx/.ssh/id_rsa RSA SHA256:0IxiHWniJDBbsM8x0zx+zliWrl7PINcALzFFqVgKX/M agent
debug1: send_pubkey_test: no mutual signature algorithm
...
 
 

解决办法

 
 
sudo vim /etc/ssh/ssh_config

#增加一条PubKeyAcceptedKeyTypes

 Host *
    PubKeyAcceptedKeyTypes +ssh-rsa
#   ForwardAgent no
#   ForwardX11 no
#   PasswordAuthentication yes
#   HostbasedAuthentication no
#   GSSAPIAuthentication no
#   GSSAPIDelegateCredentials no
#   GSSAPIKeyExchange no
#   GSSAPITrustDNS no
#   BatchMode no
 
 

问题原因

 
    rsa有安全问题,新版本openssh,默认不支持rsa公钥登录,需要自己打开
 
 

Tags: gogs
评论(3) 阅读(1525)

基于ectd实现go的服务注册

2021年11月27日 18:42

 

描述

 
    基于etcd实现go程序的服务注册,所看的学习资料均比较简单, 实际中要经过反复测试,自己写了一个先用着,有坑再填.
 

需求

 
1. 注册一个服务: 程序启动时
2. 注销服务: a. 程序异常退出时, 自动注销, 有5秒TTL延迟; b. 调用Stop()接口主动注销
3. 健壮性: a.ectd单节点重启, ectd本身支持; b. etcd全部重启后, 能够恢复正常
4. key的格式: 前缀/ip/pid
5. value: 字符串.(可以先转为json, 再转string)
 
 

创建etcd客户端

 

	
//不是重点, 摘要如下代码
 
cli, err := clientv3.New(clientv3.Config{
Endpoints:   ec.Endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
 

创建Service对象, 包含服务注册

 
package main

import (
	"context"
	"errors"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/clientv3util"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"net"
	"os"
	"strings"
	"sync"
	"test/etcd"
	"time"
)


var IP string
var PID int


func init(){
	//日志
	encoderConfig := zap.NewProductionEncoderConfig()
	encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
	encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
	encoder := zapcore.NewConsoleEncoder(encoderConfig)

	//创建
	var clevel zapcore.Level
	clevel.Set("debug")
	log := zap.New(zapcore.NewCore(encoder, zapcore.AddSync(os.Stdout), clevel))
	//设为全局
	zap.ReplaceGlobals(log)

	//
	IP, _ = GetLocalIP()
	PID = os.Getpid()
}

func GetLocalIP() (string,error){
	addrs, err := net.InterfaceAddrs()
	if err != nil {
		zap.S().Warn(err)
		return "", err
	}
	for _, addr := range addrs{
		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback(){
			if ipnet.IP.To4() != nil {
				return ipnet.IP.String(), nil
			}
		}
	}
	return "", errors.New("unable to determine local ip")
}


func RegisterService(endpoints []string, key string, value string) *Service{
	s := &Service{
		Value: value,
		endpoints: endpoints,
	}

	s.TTL = 5
	s.Key = strings.Join([]string{key, IP, fmt.Sprintf("%d", PID)}, "/")

	s.init()

	return s
}


type Service struct{
	Key          	string		//key 格式: /前缀/名称/ip/pid
	Value           string		//存放信息
	endpoints    	[]string	//etcd地址
	//心跳
	IsAlive      	bool		//是否存活
	heartCancel 	context.CancelFunc	//取消心跳
	//租约
	TTL             int64		//租约(秒)
	grant			*clientv3.LeaseGrantResponse	//租约
}


func (s *Service) init(){
	//启用etcd连接
	etcd.EnableEtcd(s.endpoints)

	//心跳
	go s.start()
}

func (s *Service) start(){
	defer func(){
		s.IsAlive = false

		//注销服务
		s.deleteService()
		//关闭续约(如果共用会误关?)
		etcd.Cli().Lease.Close()
	}()

	var aliveRsp <-chan *clientv3.LeaseKeepAliveResponse
	var err error
	var ctx context.Context
	for{
		if !s.IsAlive{
			// 注册服务
			s.registerService()

			// 续期信号chan
			aliveRsp, err = etcd.Cli().KeepAlive(context.TODO(), s.grant.ID)
			if err != nil{
				zap.S().Warn(err)
			}

			ctx, s.heartCancel = context.WithCancel(context.Background())
		}

		// 监听心跳信号
		select{
		case rsp := <- aliveRsp:
            //异常时无信号
			if rsp ==nil{
				s.IsAlive = false
				zap.S().Info("service missing signal")
				time.Sleep(time.Second*5)
				continue
			}

			s.IsAlive = true
			zap.S().Debugf("service alive %v", rsp.ID)
		case <- ctx.Done():
			zap.S().Info("service stopping")
			return
		}
	}
}

//停止

func (s *Service) Stop(){
	s.heartCancel()
}
 
[佛說大乘無量壽莊嚴清淨平等覺經pdf](http://www.sxjy360.top/page-download/)
[净土大经科注2014-doc](http://www.sxjy360.top/page-download/)
[此生必看的科学实验-水知道答案](http://www.sxjy360.top/page-download/)
[印光大师十念法(胡小林主讲第1集)](http://www.sxjy360.top/page-download/)
 

Tags: etcd
评论(86) 阅读(1493)

go和python解码msgpack不一致

2021年11月27日 18:37

 

描述

 
    有一个消息字典类型, 使用go进行编码, 传入消息中间件。同时有一个python程序监听, 监听到消息之后进行解码, 遇到了编码问题
 
 

环境

 
go使用:     github.com/vmihailenco/msgpack/v5 v5.3.4
python使用: msgpack==1.0.2
 
 

python解码

 
# go编码的对象是个字典,value可能包含了数组,并且数组类型复杂
msgpack.unpackb(xxx)
 
 

错误提示

 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0bx0 in position 0: invalid start byte
 

解决办法

 
msgpack.unpackb(xxx, raw=True)

#raw=True  解析为python字节类型
#raw=False 解析为python字符串, 默认使用utf-8编码
 

评论(794) 阅读(3038)

golang位移操作的一个小坑

2021年11月25日 16:08

问题描述

 
    现有int16类型的整数,将它转为两个字节存放。在还原时,一不小心会就被坑了。
 
 

错误用法

 
length := int(keyBytes[cursor] + keyBytes[cursor+1] <<8)
 

正确用法

 
keyBytes := []byte{ 16, 1}
length := int(uint16(keyBytes[cursor]) + uint16(keyBytes[cursor+1]) <<8)
 

错误原因

 
字节byte位移时类型不会自动变化, 先要转换类型再位移。
 
 

评论(20) 阅读(1007)

go版本的supervisord

2021年11月14日 13:27

 

描述

 
python版本的supervisrod, 在linux、windows系统中离线安装都不怎么方便.  是否有go版本的呢?
 

github地址

 
[https://github.com/ochinchina/supervisord/](https://github.com/ochinchina/supervisord/)
 

踩坑记录

 
* 编译32版本, 有些报错, 需要修改源码
 
* 在ui界面快速点击start/stop, 进程可能会启动多个。需要加锁处理处理.
 
## 用法
 
* 配置文件查找顺序
 
$CWD/supervisord.conf
$CWD/etc/supervisord.conf
/etc/supervisord.conf
/etc/supervisor/supervisord.conf (since Supervisor 3.3.0)
../etc/supervisord.conf (Relative to the executable)
../supervisord.conf (Relative to the executable)
 
 
* 最简单配置supervisord.conf
 
[program:test]
command = /your/program args
 
 
* 常用配置
 
[inet_http_server]          ;http接口
port=:9002
;username=xxxx
;password=xxxx

[supervisorctl]             ;ui界面
serverurl=http://127.0.0.1:9002

[supervisord]               ;日志
logfile=%(here)s/../log/supervisord.log
logfileMaxbytes=50MB        ;文件大小
logfileBackups=10           ;10个文件
loglevel=info               ;日志级别

[program:middleware1]       ;进程
command = %(here)s/../middleware1.exe   ;here指的是supervisord.conf的路径,而不是supervisord.exe的路径
autorestart = true
numprocs=1                  ;进程数量

[program:programname1]
command = %(here)s/../programname1.exe
autorestart = true
numprocs=1
depends_on=middleware1      ;依赖进程


[program:programname2]
command = %(here)s/../programname2.exe
autorestart = true
numprocs=1
depends_on=middleware1
 

Tags: supervisord go
评论(46) 阅读(1186)

nats批量publish最后一条疑似丢失

2021年11月14日 13:24

 

问题描述

 
    写了一个测试脚本,从数据读取一组数据,然后逐条publish到nats-server中, 确定每条都publish了
 
    有另外一个程序中subscribe订阅,在该程序发现最后一条一直没收到
 
 

环境

 
* go: 1.16
 
* nats-server
 

程序摘要

 
* 发布
 
for _, record := range records{
    ...
    gNatsConn.Publish(Topic, record)
    time.Sleep(time.Second*0.1)    //间隔越小出现的概率越大. 间隔1秒好像就没有
}
 
* 订阅
 
natsConn.Subscribe(topic, func(m *nats.Msg){
    fmt.Println(m.Data)
    ....
}
 

原因以及解决办法

 
    nats在发布消息时,有应该是用了缓存通道, 大小是1。最后一条数据有可能没被读走
 
    发布频率较高,最后需要flush一下
 
 
 

Tags: nats go
评论(13) 阅读(1176)

influxdb遇到时间点存储不了

2021年9月23日 14:33

 

 问题描述

 
    有一份mysql存储的历史数据,准备把它转为influxdb存储, 时间点用的是mysql表中记录的创建时间,创建时间是时许的。
 

环境描述

 
* influxdb: 2.0.8
 
* 开发语言: go
 
* influx客户端: influxdb-client-go 2.5.0
 
 

关键代码摘要

 
    使用如下代码, 创建point,发现存储不了。
 
point := influxdb2.NewPoint("table_name",
    map[string]string{
        ...
    },
    map[string]interface{}{
        ....
    },
    ts,     //对应mysql表中记录的创建时间
    )
 

解决过程

 
    开始以为数据是几年前的, 时间太长存储不了。 后来用time.Time().Add(-xxx), 把时间移到3年前, 结果能存储
 
 

解决办法

 
    原因是influx的精度是纳秒, 存入的时间精度是秒, 在Influx中无法存储
 
    解决办法是加1纳秒, ts.Add(time.Nanosecond*1)
 

评论(31) 阅读(1657)

influxdb如何把point中的多个field一起查出来

2021年8月29日 14:39

 

问题描述

 
    influx查询包含了水平拆分、垂直拆分. 查询结果的每一条记录record,只对应一个field
 
    写入一个point时包含多个field, 查询时如何将这几个field一起查出来呢?
 

写入示例

 
point := influxdb2.NewPoint("history",
    map[string]string{
        "Version": DedefaultVersion,
        "Name": sh.Name,
        "Active": fmt.Sprintf("%v", sh.Active),
        "PortIdx": fmt.Sprintf("%d", sh.PortIdx),
        "Port": fmt.Sprintf("%v", sh.Port),
    },
    map[string]interface{}{
        "MsgId": fmt.Sprintf("%d", sh.MsgId),
        "Content": sh.Content,
    },
    time.Now(),
    )
 
* 查询
 
from(bucket:"my-bucket")
|> range(start: -10m)
|> filter(fn: (r) => (r._measurement=="history") and (r.Name=="xxxx") )
 
用这个方法查询, 每个record只包含一个_field
 
 

解决方法: 分组

 
    分组之后的record,会多一个table属性。table相同,则为一组。也就是一个point的field。
 
* 按写入时间分组
 
|> group(columns: ["_time"])
 
* 增加id标签,按id分组
 
为每个point增加一个id标签, 然后按id分组
 
 
 
 

Tags: influxdb
评论(89) 阅读(2230)

golang编译-ldflags -H windowsgui被认为是病毒

2021年8月26日 17:22

 
 

介绍

 
    使用walk为程序做了一个windows界面, cmd窗口需要隐藏。使用了 go build -ldflags="-H windowsgui"
 
    结果exe程序被杀毒软件认为是病毒
 

为什么会认为是病毒?

 
    隐藏cmd窗口, 双击exe,可以直接启动,并且看不到任何窗口。这不就是病毒的喜欢干的事吗.
 
    所以有些杀毒软件, 干脆通杀, 使用了 "-H windowsgui" 直接被认为是病毒
 
    我们这些正常的需求该咋办呢?
 

解决办法

 
    在程序中,使用代码隐藏cmd窗口
 
 
import "github.com/lxn/win"

//隐藏cmd窗口
win.ShowWindow(win.GetConsoleWindow(), win.SW_HIDE)
 
    缺点:启动时会有很短暂的黑框闪现,不过问题不大
 
 

评论(65) 阅读(2639)