tomli支持数组混合类型

2022年8月05日 13:57

起因

    
    使用python的toml包解析toml配置文件。配置文件中使用了混合类型数组,结果程序报错。
 

原因与解决方法

 
1. toml0.5规范不支持数组混合类型, toml1.0规范才支持数组混合类型。
 
2. python的toml解析包有多种,toml包不支持1.0,所以不支持混合类型
 
3. 建议使用rtoml, tomli等
 

混合类型示例

 
a=[1, 2.1]
b=["a", ["b", "c"]]
c=["a", {"b": 1}]
 

rtoml用法

obj = rtoml.load("""
a=[1, 2.1]
""")
print(obj)
 

Tags: toml
评论(175) 阅读(3024)

celery5.2.1以下版本任务调用多耗费1秒

2021年12月28日 10:14

问题

 
    5.0.5版本celery存在一个缺陷, 调用任务耗时多用1秒
 
    这个问题在5.2.0, 5.1.2同样存在
 
    没理解为什么到5.2.1版本才解决,甚至一度把celery排除python技术栈
 

环境描述

 
 
python3.6
celery5.0.5
windows 32位
 
 

调用celery任务代码摘要

 
# 1.send_task返回AsyncResult
# 2.AsyncResult的get()等待返回结果
# 3.get()会比真实多耗费1秒,并且每次请求都会出现

from celery import Celery
class xxxCelery(Celery):
    def call_xxx(self, name, timeout=120, **kwargs):
        LOG.info("send_task: %s" % locals())
        start = time.time()
        r = self.send_task(name, **kwargs)
        g = eventlet.spawn(r.get, timeout=timeout)
        result = g.wait()
        print("cost: %s" % time.time()-start)
        return
 

两个版本比对

 
分析celery源码之后,可以知道问题在drain_events()内部, 比较5.2.0和5.2.1版本
 
#celery/backends/asynchronuse.py

class greenletDrainer(Drainer):
    ...
    def run(self):
        self._started.set()
        while not self._stopped.is_set():
            try:
                self.result_consumer.drain_events(timeout=1)
                # 新增了两句, 估摸着是这个问题
                self._send_drain_complete_event()
                self._create_drain_complete_event()
            except socket.timeout:
                pass
        self._shutdown.set()
 
 

解决办法

 
    celery升级到5.2.1
 
    python要升级到3.7以上版本(celery要求python3.7以上版本)
 

解决效果

 
    耗时从1000多ms变成了30ms
 

Tags: celery
评论(73) 阅读(2370)

eventlet如何绿化pyserial最好

2021年3月24日 19:19

 

问题

 
pyserial访问windows中的串口,如何绿化?不阻塞协程? 效率最高? 绿化毫无疑问要借用tpool.execute.
 
代码该如何实现才最佳呢?
 

方法一: 直接recv

 
直接recv,会阻塞协程
 
缺点: 协程被阻塞,程序不可用
 

方法二: tpool+in_waiting判断

 
import serial
from eventlet import tpool
class SerialSocket(serial.Serial):
    def recv(self):
        # 读完
        if self.in_waiting:
            return tpool.execute(self.read, self.in_waiting)
        return data

def listen_message():
    ss = SerialSocket("com10")
    buffer = b''
    while True:
        data = ss.recv()
        if data is not None:
            buffer +=data
 
缺点: cpu利用率较高
 

方法三: tpool+in_waiting判断+sleep

 
import serial
from eventlet import tpool, sleep
class SerialSocket(serial.Serial):
    def recv(self):
        # 读完
        if self.in_waiting:
            return tpool.execute(self.read, self.in_waiting)
        return data

def listen_message():
    ss = SerialSocket("com10")
    buffer = b''
    while True:
        data = ss.recv()
        if data is not None:
            buffer +=data
        else:
            sleep(0.01)
 
缺点: 满足大部分需求, 对时间要求比较敏感的程序,还是不行,多了0.01秒
 

方法四(完美):tpool+in_waiting判断+read(1)阻塞

 
利用serial的read()阻塞特性,加上tpool线程,最终线程阻塞转换成协程阻塞
 
import serial
from eventlet import tpool

class SerialSocket(serial.Serial):
    def recv(self):
        # 等待
        data = tpool.execute(self.read, 1)
        # 读完
        while self.in_waiting:
            data += tpool.execute(self.read, self.in_waiting)
        return data

def listen_message():
    ss = SerialSocket("com10")
    buffer = b''
    while True:
        g = evenlet.spawn(ss.recv)
        data = g.wait()
        buffer +=data
 
 

Tags: eventlet pyserial
评论(0) 阅读(1066)

eventlet.monkey_patch是否影响threading

2021年3月24日 13:42

说明

 
evenlet.monkey_patch()默认会绿化thread,并未看到绿化threading,为什么实际中threading被绿化了,如何证明? 测试代码如下
 

修改evenlet,加上标记语句

 
修改eventlet.green.thread.py的start_new_thread方法,加一条打印语句
 
def start_new_thread(function, args=(), kwargs=None):
    ...
    print("hello: ", locals())
    g = greenthread.spawn_n(__thread_body, function, args, kwargs)
    return get_ident(g)
 
 

测试代码

 
# -*- coding:utf-8 -*-
# from eventlet import patcher
# original_threading = patcher.original('threading')
import threading as original_threading
import time
import eventlet
eventlet.monkey_patch()


def thread1_run():
    while True:
        print("thread1:", time.time())
        time.sleep(1)


if __name__ == "__main__":
    thread1 = original_threading.Thread(target=thread1_run, name="thread1", daemon=True)
    print(type(thread1))
    thread1.start()

    e = eventlet.Event()
    e.wait()
 

 

Tags: eventlet
评论(39) 阅读(1586)

python自定义windowsr日志支持文件分割

2021年1月20日 13:48

描述

 
python自带的RotatingFileHandler,在windows中运行, 分割文件rename时会出问题。因为windows不支持rename正在使用的文件
 

解决办法

 
自定义handler, 按大小和日期切割文件
 
原理: handler写文件时调用emit, 其中shouldRollover判断是否要分割, doRollover进行分割. 重写这个两个函数就可以
 
class BaseRotatingHandler(logging.FileHandler):

    def emit(self, record):
        """
        Emit a record.

        Output the record to the file, catering for rollover as described
        in doRollover().
        """
        try:
            if self.shouldRollover(record):
                self.doRollover()
            logging.FileHandler.emit(self, record)
        except Exception:
            self.handleError(record)
 

示例

 
import os
import time
import datetime
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path


class AutumnRotatingFileHandler(RotatingFileHandler):
    """
    日志+大小+支持windows
    """

    def __init__(self, filename, mode='a', maxBytes=0, backupCount=99, encoding=None, delay=False, backupDayCount=30):
        filename = str(filename)
        self.date_format = "%Y%m%d"
        self.create_date = self._now_date()
        self.backupCount = backupCount

        # 保存原始文件名
        self.filename = str(Path(filename).absolute())
        self.backupDayCount = backupDayCount

        RotatingFileHandler.__init__(self, self.last_file_name(filename), mode=mode, maxBytes=maxBytes, backupCount=backupCount, encoding=encoding, delay=delay)

    def _now_date(self):
        return time.strftime(self.date_format, time.localtime())

    def doRollover(self):
        """
        Do a rollover, as described in __init__().
        """
        # 关闭当前文件
        if self.stream:
            self.stream.close()
            self.stream = None

        # 生成最新文件名
        suffix = "."+ self._now_date()
        self.baseFilename = str(self.filename) + suffix
        if self.backupCount > 0:
            for i in range(0, self.backupCount):
                number_suffix = "{:0>2d}".format(i)
                sfn = ".".join([self.baseFilename, number_suffix])
                if os.path.exists(sfn):
                    continue
                else:
                    break
            number_suffix = "{:0>2d}".format(i)
            self.baseFilename = ".".join([self.baseFilename, number_suffix])

        # 删除过期文件
        self.deleteExpiredFiles()

        # 打开
        self.stream = self._open()
        self.create_date = self._now_date()

    def shouldRollover(self, record):
        """
        Determine if rollover should occur.

        Basically, see if the supplied record would cause the file to exceed
        the size limit we have.
        """
        # 文件分割条件1: 日期变化
        if self._now_date() != self.create_date:
            return 1

        if self.stream is None:                 # delay was set...
            self.stream = self._open()

        # 文件分割条件1: 文件大小超过限制
        if self.maxBytes > 0:                   # are we rolling over?
            msg = "%s\n" % self.format(record)
            self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature
            if self.stream.tell() + len(msg) >= self.maxBytes:
                return 1
        return 0

    def deleteExpiredFiles(self):
        """
        删除过期文件
        """
        dead_datetime = datetime.datetime.now() - datetime.timedelta(days=self.backupDayCount)

        filenames = []
        for i in range(1, 3):
            date = dead_datetime - datetime.timedelta(days=1)
            date_str = date.strftime(self.date_format)

            base_name = "%s.%s" % (self.filename, date_str)
            filenames.append(base_name)

            for j in range(1, self.backupCount):
                sfn = "%s.%d" % (base_name, j)
                filenames.append(sfn)

        # delete file
        for filename in filenames:
            if os.path.exists(filename):
                try:
                    os.remove(sfn)
                except:
                    pass

    def last_file_name(self, filename):
        """
        文件名
        """
        suffix = "."+self._now_date()
        base_filename = str(filename) + suffix
        if self.backupCount > 0:
            for i in reversed(range(0, self.backupCount)):
                number_suffix = "{:0>2d}".format(i)
                sfn = ".".join([base_filename, number_suffix])
                # 最近存在的文件
                if os.path.exists(sfn):
                    break
                else:
                    continue
            number_suffix = "{:0>2d}".format(i)
            base_filename = ".".join([base_filename, number_suffix])
        return base_filename
 

评论(2) 阅读(1093)

eventlet模块中select出现ValueError错误

2021年1月02日 10:34

 

问题说明

 
如果socket句柄是-1, select.select()在判断句柄状态时会报错 ValueError: file descriptor cannot be a negative integer (-1)
 
而eventlet的select模块没有处理这种异常
 

出现环境

 
windows、python3.8、eventlet 0.26 在使用eventlet开发socket程序时, 如果客户端强制中断连接,会出现如下错误
 

错误描述

 
exception: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
...
ValueError: file descriptor cannot be a negative integer (-1)
 
### 解决办法
 
修改文件eventlet\hubs\selects.py,然后把eventlet重新打包
 
    def wait(self, seconds=None):
        ....
        try:
            r, w, er = select.select(reader_fds, writer_fds, all_fds, seconds)
        # 捕捉异常并移除句柄-1
        except ValueError as e:
            self.remove_descriptor(-1)
            return
        except select.error as e:
            if support.get_errno(e) == errno.EINTR:
                return
            elif support.get_errno(e) in BAD_SOCK:
                self._remove_bad_fds()
                return
            else:
                raise
 

总结

 
1.很奇怪eventlet怎么没有修复这种兼容性错误?
 
2.模拟错误的方法  select.select([-1], [], [], 0)
 
3. socket中断句柄怎么变成了-1,非法的句柄-1又是如何进入eventlet的?
 
 

Tags: eventlet
评论(1) 阅读(4282)

eventlet绿化和patch原理

2020年11月21日 12:09

说明

 
eventlet是一个必备工具,经常用,绿化原理有点忘记了,重新复习一遍.
 
 

三个主要问题

 
1. 绿化的原理是什么?
 
2. 绿化怎么管理?
 
3. 绿化怎么引入?
 

绿化原理

 
利用select/epolls/kqueue等操作系统提供的非阻塞操作,将阻塞改为非阻塞.
 

引用管理

 
eventlet在import之后,将模块中的属性绿化.
 
用一小段代码来查看看
import sys
import eventlet
# eventlet.monkey_patch()

httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

print("================")
for k,v in sys.modules.items():
    if "socket" in k:
        print(k, v)


# 打印内容
<module 'httplib2' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\eventlet\\green\\socket.py'>
================
_socket <module '_socket' from 'c:\\python\\python36\\DLLs\\_socket.pyd'>
socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>
__original_module_socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>

# 说明了什么?
绿化只是替换httplib2模块中的引用属性socket。并未改变sys.modules中的属性
 

引入绿化方法一:直接import

 
from eventlet.green import socket
 

引入绿化方法二:eventlet.import_patch

 
1.好处: 能绿化模块内部的系统模块
2.只能绿化os, select, socket, thread, time, psycopy, MySQLdb, bultines, subprocess
3.如果是import threading,不会自动绿化, 需要from eventlet.green import threading
 
import eventlet
httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

# 打印
<module 'httplib2' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\eventlet\\green\\socket.py'>
 

引入绿化方法三:eventlet.monkey_patch

 
1.与import_patch不同,会直接修改sys.modules
2.但是也只能绿化os,select等等几个模块,其它模块需要用直接引用绿化
 
 
 

Tags: eventlet
评论(15) 阅读(1988)

实用工具状态机transitions

2020年11月21日 09:42

说明

 
1. 状态机是一个非常实用的理论。在涉及到复杂的场景,建立状态机模型,能带来极大的方便。比如,网络连接、模型状态、业务逻辑。
 
2. 状态机并不复杂, 重要的是它的思想,能够极大减轻复杂度。使用时关键在于定义好事件和动作。
 
 

 基本概念

 
State: 状态
Event: 事件. 事件触发状态变换
Action: 动作. event发生前或后执行的动作
transition: 变换. 状态变换
 

github

 
https://github.com/pytransitions/transitions
 

安装

 
pip install transitions
 

简单示例

# 连接协议状态机
from transitions.extensions import HierarchicalMachine as Machine
from transitions.extensions.nesting import NestedState

class ConnectionStateMachine:
    """Connection state machine."""

    def __init__(self, callbacks=None):
        """
        :param callbacks: callbacks for the state machine
        """
        self.callbacks = {}

        # 定义状态
        self.states = ["STATE_NOT_CONNECTED",
                       {
                            'name': "STATE_CONNECTED",   # 状态名
                            'on_enter': self._on_enter_CONNECTED,  # 进入状态触发
                            'on_exit': self._on_exit_CONNECTED,    # 退出状态触发
                            'children': [     # 状态嵌套
                                "STATE_NOT_SELECTED",
                                {
                                    'name': "STATE_SELECTED",
                                    'on_enter': self._on_enter_CONNECTED_SELECTED
                                }
                            ]
                       }]

        # transition 1
        self.machine = Machine(model=self, states=self.states, initial="STATE_NOT_CONNECTED", auto_transitions=False)

        if callbacks:
            self.callbacks = callbacks

        # 定义状态变换
        self.machine.add_transition('connect', "STATE_NOT_CONNECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 2
        self.machine.add_transition('disconnect', "STATE_CONNECTED", "STATE_NOT_CONNECTED")  # transition 3
        self.machine.add_transition('select', "STATE_CONNECTED_NOT_SELECTED", "STATE_CONNECTED_SELECTED")  # transition 4
        self.machine.add_transition('deselect', "STATE_CONNECTED_SELECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 5
        self.machine.add_transition('timeoutT7', "STATE_CONNECTED_NOT_SELECTED", "STATE_NOT_CONNECTED")  # transition 6

    # 事件触发的动作
    def _on_enter_CONNECTED(self):
        if "on_enter_CONNECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED"]()

    def _on_exit_CONNECTED(self):
        if "on_exit_CONNECTED" in self.callbacks:
            self.callbacks["on_exit_CONNECTED"]()

    def _on_enter_CONNECTED_SELECTED(self):
        if "on_enter_CONNECTED_SELECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED_SELECTED"]()
 
 
 

定义状态机

 
# 一般都是两种:不嵌套、嵌套
from transitions import Machine   #不嵌套
from transitions.extensions import HierarchicalMachine as Machine # 嵌套

# 检查状态
machine.state
machine.is_<state_name>()
 

定义状态

# 状态可以是三种类型: 对象、字符串、字典
from transitions import State
states = [
    State(name="solid"),    # 对象
    "liquid",               # 字符串
    {"name": "gas"}         # 字典
]
 
 

定义变换

 
# 定义函数
machine.add_transition(trigger, source, dest, )
trigger(str)         方法名,触发状态转换
source(str or list)  源状态
dest(str)            目标状态

# 加上变换
machine.add_transition(trigger="wake_up", source="asleep", dest="hanging out")
machine.add_transition('work_out', 'hanging out', 'hungry')
machine.add_transition('nap', '*', 'asleep')   # 从任意状态变为asleep
 
 

 

Tags: 状态机
评论(7) 阅读(1262)

eventlet并发读写socket出现Second simultaneous问题

2020年7月08日 10:50

 

描述

 
celery+eventlet实现任务调用时出现RuntimeError: Second simultaneous read on fileno 14 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket
 
 

模拟错误

 
网上找的一段代码,模拟出同样问题
 
def main():
    import eventlet
    httplib2 = eventlet.import_patched('httplib2')
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

错误内容

 
Traceback (most recent call last):
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenthread.py", line 221, in main
    result = function(*args, **kwargs)
  File "test.py", line 12, in get_url
    resp, content = shared_resource.request("http://eventlet.net")
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1994, in request
    cachekey,
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1651, in _request
    conn, request_uri, method, body, headers
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1558, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 403, in sendall
    tail = self.send(data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 397, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 392, in _send_loop
    timeout_exc=_timeout_exc)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 210, in _trampoline
    mark_as_closed=self._mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/__init__.py", line 157, in trampoline
    listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/epolls.py", line 22, in add
    listener = hub.BaseHub.add(self, evtype, fileno, cb, tb, mac)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/hub.py", line 181, in add
    evtype, fileno, evtype, cb, bucket[fileno]))
RuntimeError: Second simultaneous write on fileno 5 detected.  Unless you really know what you're doing, make sure that only one greenthread can write any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of GreenThread object at 0x7f25df161680>; THAT THREAD=FdListener('write', 5, <built-in method switch of GreenThread object at 0x7f25df161470>, <built-in method throw of GreenThread object at 0x7f25df161470>)
 

 解决方法

 
猜测可能是没绿化完整,改成如下,果然就正常了。
 
def main():
    import eventlet
#     httplib2 = eventlet.import_patched('httplib2')
    import http2
    eventlet.monkey_patch()
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

结论

  • 局部import的模块,可能使用了未被绿化的模块,导出eventlet出现问题
  • 延迟加载的模块配合eventlet也会出现同样问题
 

 

Tags: eventlet
评论(47) 阅读(3529)

celery配合redis出现redis.exceptions.InvalidResponse Protocol Error

2020年6月21日 09:29

Tags: Redis Celery
评论(69) 阅读(6822)