python自定义windowsr日志支持文件分割

2021年1月20日 13:48

描述

 
python自带的RotatingFileHandler,在windows中运行, 分割文件rename时会出问题。因为windows不支持rename正在使用的文件
 

解决办法

 
自定义handler, 按大小和日期切割文件
 
原理: handler写文件时调用emit, 其中shouldRollover判断是否要分割, doRollover进行分割. 重写这个两个函数就可以
 
class BaseRotatingHandler(logging.FileHandler):

    def emit(self, record):
        """
        Emit a record.

        Output the record to the file, catering for rollover as described
        in doRollover().
        """
        try:
            if self.shouldRollover(record):
                self.doRollover()
            logging.FileHandler.emit(self, record)
        except Exception:
            self.handleError(record)
 

示例

 
import os
import time
import datetime
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path


class AutumnRotatingFileHandler(RotatingFileHandler):
    """
    日志+大小+支持windows
    """

    def __init__(self, filename, mode='a', maxBytes=0, backupCount=99, encoding=None, delay=False, backupDayCount=30):
        filename = str(filename)
        self.date_format = "%Y%m%d"
        self.create_date = self._now_date()
        self.backupCount = backupCount

        # 保存原始文件名
        self.filename = str(Path(filename).absolute())
        self.backupDayCount = backupDayCount

        RotatingFileHandler.__init__(self, self.last_file_name(filename), mode=mode, maxBytes=maxBytes, backupCount=backupCount, encoding=encoding, delay=delay)

    def _now_date(self):
        return time.strftime(self.date_format, time.localtime())

    def doRollover(self):
        """
        Do a rollover, as described in __init__().
        """
        # 关闭当前文件
        if self.stream:
            self.stream.close()
            self.stream = None

        # 生成最新文件名
        suffix = "."+ self._now_date()
        self.baseFilename = str(self.filename) + suffix
        if self.backupCount > 0:
            for i in range(0, self.backupCount):
                number_suffix = "{:0>2d}".format(i)
                sfn = ".".join([self.baseFilename, number_suffix])
                if os.path.exists(sfn):
                    continue
                else:
                    break
            number_suffix = "{:0>2d}".format(i)
            self.baseFilename = ".".join([self.baseFilename, number_suffix])

        # 删除过期文件
        self.deleteExpiredFiles()

        # 打开
        self.stream = self._open()
        self.create_date = self._now_date()

    def shouldRollover(self, record):
        """
        Determine if rollover should occur.

        Basically, see if the supplied record would cause the file to exceed
        the size limit we have.
        """
        # 文件分割条件1: 日期变化
        if self._now_date() != self.create_date:
            return 1

        if self.stream is None:                 # delay was set...
            self.stream = self._open()

        # 文件分割条件1: 文件大小超过限制
        if self.maxBytes > 0:                   # are we rolling over?
            msg = "%s\n" % self.format(record)
            self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature
            if self.stream.tell() + len(msg) >= self.maxBytes:
                return 1
        return 0

    def deleteExpiredFiles(self):
        """
        删除过期文件
        """
        dead_datetime = datetime.datetime.now() - datetime.timedelta(days=self.backupDayCount)

        filenames = []
        for i in range(1, 3):
            date = dead_datetime - datetime.timedelta(days=1)
            date_str = date.strftime(self.date_format)

            base_name = "%s.%s" % (self.filename, date_str)
            filenames.append(base_name)

            for j in range(1, self.backupCount):
                sfn = "%s.%d" % (base_name, j)
                filenames.append(sfn)

        # delete file
        for filename in filenames:
            if os.path.exists(filename):
                try:
                    os.remove(sfn)
                except:
                    pass

    def last_file_name(self, filename):
        """
        文件名
        """
        suffix = "."+self._now_date()
        base_filename = str(filename) + suffix
        if self.backupCount > 0:
            for i in reversed(range(0, self.backupCount)):
                number_suffix = "{:0>2d}".format(i)
                sfn = ".".join([base_filename, number_suffix])
                # 最近存在的文件
                if os.path.exists(sfn):
                    break
                else:
                    continue
            number_suffix = "{:0>2d}".format(i)
            base_filename = ".".join([base_filename, number_suffix])
        return base_filename
 

评论(1) 阅读(110)

eventlet模块中select出现ValueError错误

2021年1月02日 10:34

 

问题说明

 
如果socket句柄是-1, select.select()在判断句柄状态时会报错 ValueError: file descriptor cannot be a negative integer (-1)
 
而eventlet的select模块没有处理这种异常
 

出现环境

 
windows、python3.8、eventlet 0.26 在使用eventlet开发socket程序时, 如果客户端强制中断连接,会出现如下错误
 

错误描述

 
exception: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
...
ValueError: file descriptor cannot be a negative integer (-1)
 
### 解决办法
 
修改文件eventlet\hubs\selects.py,然后把eventlet重新打包
 
    def wait(self, seconds=None):
        ....
        try:
            r, w, er = select.select(reader_fds, writer_fds, all_fds, seconds)
        # 捕捉异常并移除句柄-1
        except ValueError as e:
            self.remove_descriptor(-1)
            return
        except select.error as e:
            if support.get_errno(e) == errno.EINTR:
                return
            elif support.get_errno(e) in BAD_SOCK:
                self._remove_bad_fds()
                return
            else:
                raise
 

总结

 
1.很奇怪eventlet怎么没有修复这种兼容性错误?
 
2.模拟错误的方法  select.select([-1], [], [], 0)
 
3. socket中断句柄怎么变成了-1,非法的句柄-1又是如何进入eventlet的?
 
 

Tags: eventlet
评论(18) 阅读(472)

eventlet绿化和patch原理

2020年11月21日 12:09

说明

 
eventlet是一个必备工具,经常用,绿化原理有点忘记了,重新复习一遍.
 
 

三个主要问题

 
1. 绿化的原理是什么?
 
2. 绿化怎么管理?
 
3. 绿化怎么引入?
 

绿化原理

 
利用select/epolls/kqueue等操作系统提供的非阻塞操作,将阻塞改为非阻塞.
 

引用管理

 
eventlet在import之后,将模块中的属性绿化.
 
用一小段代码来查看看
import sys
import eventlet
# eventlet.monkey_patch()

httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

print("================")
for k,v in sys.modules.items():
    if "socket" in k:
        print(k, v)


# 打印内容
<module 'httplib2' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\eventlet\\green\\socket.py'>
================
_socket <module '_socket' from 'c:\\python\\python36\\DLLs\\_socket.pyd'>
socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>
__original_module_socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>

# 说明了什么?
绿化只是替换httplib2模块中的引用属性socket。并未改变sys.modules中的属性
 

引入绿化方法一:直接import

 
from eventlet.green import socket
 

引入绿化方法二:eventlet.import_patch

 
1.好处: 能绿化模块内部的系统模块
2.只能绿化os, select, socket, thread, time, psycopy, MySQLdb, bultines, subprocess
3.如果是import threading,不会自动绿化, 需要from eventlet.green import threading
 
import eventlet
httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

# 打印
<module 'httplib2' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\eventlet\\green\\socket.py'>
 

引入绿化方法三:eventlet.monkey_patch

 
1.与import_patch不同,会直接修改sys.modules
2.但是也只能绿化os,select等等几个模块,其它模块需要用直接引用绿化
 
 
 

Tags: eventlet
评论(65) 阅读(343)

实用工具状态机transitions

2020年11月21日 09:42

说明

 
1. 状态机是一个非常实用的理论。在涉及到复杂的场景,建立状态机模型,能带来极大的方便。比如,网络连接、模型状态、业务逻辑。
 
2. 状态机并不复杂, 重要的是它的思想,能够极大减轻复杂度。使用时关键在于定义好事件和动作。
 
 

 基本概念

 
State: 状态
Event: 事件. 事件触发状态变换
Action: 动作. event发生前或后执行的动作
transition: 变换. 状态变换
 

github

 
https://github.com/pytransitions/transitions
 

安装

 
pip install transitions
 

简单示例

# 连接协议状态机
from transitions.extensions import HierarchicalMachine as Machine
from transitions.extensions.nesting import NestedState

class ConnectionStateMachine:
    """Connection state machine."""

    def __init__(self, callbacks=None):
        """
        :param callbacks: callbacks for the state machine
        """
        self.callbacks = {}

        # 定义状态
        self.states = ["STATE_NOT_CONNECTED",
                       {
                            'name': "STATE_CONNECTED",   # 状态名
                            'on_enter': self._on_enter_CONNECTED,  # 进入状态触发
                            'on_exit': self._on_exit_CONNECTED,    # 退出状态触发
                            'children': [     # 状态嵌套
                                "STATE_NOT_SELECTED",
                                {
                                    'name': "STATE_SELECTED",
                                    'on_enter': self._on_enter_CONNECTED_SELECTED
                                }
                            ]
                       }]

        # transition 1
        self.machine = Machine(model=self, states=self.states, initial="STATE_NOT_CONNECTED", auto_transitions=False)

        if callbacks:
            self.callbacks = callbacks

        # 定义状态变换
        self.machine.add_transition('connect', "STATE_NOT_CONNECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 2
        self.machine.add_transition('disconnect', "STATE_CONNECTED", "STATE_NOT_CONNECTED")  # transition 3
        self.machine.add_transition('select', "STATE_CONNECTED_NOT_SELECTED", "STATE_CONNECTED_SELECTED")  # transition 4
        self.machine.add_transition('deselect', "STATE_CONNECTED_SELECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 5
        self.machine.add_transition('timeoutT7', "STATE_CONNECTED_NOT_SELECTED", "STATE_NOT_CONNECTED")  # transition 6

    # 事件触发的动作
    def _on_enter_CONNECTED(self):
        if "on_enter_CONNECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED"]()

    def _on_exit_CONNECTED(self):
        if "on_exit_CONNECTED" in self.callbacks:
            self.callbacks["on_exit_CONNECTED"]()

    def _on_enter_CONNECTED_SELECTED(self):
        if "on_enter_CONNECTED_SELECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED_SELECTED"]()
 
 
 

定义状态机

 
# 一般都是两种:不嵌套、嵌套
from transitions import Machine   #不嵌套
from transitions.extensions import HierarchicalMachine as Machine # 嵌套

# 检查状态
machine.state
machine.is_<state_name>()
 

定义状态

# 状态可以是三种类型: 对象、字符串、字典
from transitions import State
states = [
    State(name="solid"),    # 对象
    "liquid",               # 字符串
    {"name": "gas"}         # 字典
]
 
 

定义变换

 
# 定义函数
machine.add_transition(trigger, source, dest, )
trigger(str)         方法名,触发状态转换
source(str or list)  源状态
dest(str)            目标状态

# 加上变换
machine.add_transition(trigger="wake_up", source="asleep", dest="hanging out")
machine.add_transition('work_out', 'hanging out', 'hungry')
machine.add_transition('nap', '*', 'asleep')   # 从任意状态变为asleep
 
 

 

Tags: 状态机
评论(35) 阅读(241)

恶念对自己伤害是1000分,对别人的伤害是1分

2020年10月08日 15:09

愤愤不平好几天,报复别人也许就几分钟。
反之,心怀感恩好几天,心情愉快,而报答别人又能又多长时间
 
 
心怀感恩的人很少,更何况成人的世界只有利益,没有好坏。
绝大部分时候,做好事没有好报,甚至相反
 
如果选择做好人又能得到什么?
1.平安。能够降低遇到灾祸的概率
2.善念时刻能带给自己利益。
 
 
所以,有智慧的人一定会选择做好人。
 

 

评论(27) 阅读(320)

eventlet并发读写socket出现Second simultaneous问题

2020年7月08日 10:50

 

描述

 
celery+eventlet实现任务调用时出现RuntimeError: Second simultaneous read on fileno 14 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket
 
 

模拟错误

 
网上找的一段代码,模拟出同样问题
 
def main():
    import eventlet
    httplib2 = eventlet.import_patched('httplib2')
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

错误内容

 
Traceback (most recent call last):
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenthread.py", line 221, in main
    result = function(*args, **kwargs)
  File "test.py", line 12, in get_url
    resp, content = shared_resource.request("http://eventlet.net")
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1994, in request
    cachekey,
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1651, in _request
    conn, request_uri, method, body, headers
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1558, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 403, in sendall
    tail = self.send(data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 397, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 392, in _send_loop
    timeout_exc=_timeout_exc)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 210, in _trampoline
    mark_as_closed=self._mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/__init__.py", line 157, in trampoline
    listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/epolls.py", line 22, in add
    listener = hub.BaseHub.add(self, evtype, fileno, cb, tb, mac)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/hub.py", line 181, in add
    evtype, fileno, evtype, cb, bucket[fileno]))
RuntimeError: Second simultaneous write on fileno 5 detected.  Unless you really know what you're doing, make sure that only one greenthread can write any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of GreenThread object at 0x7f25df161680>; THAT THREAD=FdListener('write', 5, <built-in method switch of GreenThread object at 0x7f25df161470>, <built-in method throw of GreenThread object at 0x7f25df161470>)
 

 解决方法

 
猜测可能是没绿化完整,改成如下,果然就正常了。
 
def main():
    import eventlet
#     httplib2 = eventlet.import_patched('httplib2')
    import http2
    eventlet.monkey_patch()
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

结论

  • 局部import的模块,可能使用了未被绿化的模块,导出eventlet出现问题
  • 延迟加载的模块配合eventlet也会出现同样问题
 

 

Tags: eventlet
评论(65) 阅读(597)

celery配合redis出现redis.exceptions.InvalidResponse Protocol Error

2020年6月21日 09:29

Tags: Redis Celery
评论(73) 阅读(759)

eventlet在ubuntu上出现OSError protocol not found

2020年6月21日 09:24

描述

 
tensorflow的nvidia docker镜像使用ubuntu16.04, ubuntu是精简之后的,有些包可能没有。在上面运行eventlet会出现下面问题
 
 

错误内容

 
Traceback (most recent call last):
  File "/app/defect-client/defect_client/cmd/wafer-worker.py", line 14, in <module>
    import eventlet
  File "/usr/local/lib/python3.6/dist-packages/eventlet/__init__.py", line 10, in <module>
    from eventlet import convenience
  File "/usr/local/lib/python3.6/dist-packages/eventlet/convenience.py", line 7, in <module>
    from eventlet.green import socket
  File "/usr/local/lib/python3.6/dist-packages/eventlet/green/socket.py", line 21, in <module>
    from eventlet.support import greendns
  File "/usr/local/lib/python3.6/dist-packages/eventlet/support/greendns.py", line 69, in <module>
    setattr(dns.rdtypes.IN, pkg, import_patched('dns.rdtypes.IN.' + pkg))
  File "/usr/local/lib/python3.6/dist-packages/eventlet/support/greendns.py", line 59, in import_patched
    return patcher.import_patched(module_name, **modules)
  File "/usr/local/lib/python3.6/dist-packages/eventlet/patcher.py", line 126, in import_patched
    *additional_modules + tuple(kw_additional_modules.items()))
  File "/usr/local/lib/python3.6/dist-packages/eventlet/patcher.py", line 100, in inject
    module = __import__(module_name, {}, {}, module_name.split('.')[:-1])
  File "/usr/local/lib/python3.6/dist-packages/dns/rdtypes/IN/WKS.py", line 25, in <module>
    _proto_tcp = socket.getprotobyname('tcp')
OSError: protocol not found
 

解决办法

 
apt-get -o Dpkg::Options::="--force-confmiss" install --reinstall netbase
 

Tags: eventlet python
评论(44) 阅读(487)

celery变量共享

2020年6月21日 09:19

问题

 
很多情况下我们想让task共享变量,该怎么做?
 

celery的并发原理

 
celery的并发任务池,有eventlet, gevent, prefork, thread类型
 
eventlet/gevent协程: 只有一个进程一个线程, 全局变量在task之间共享
    
prefork属于multiprocessing: multiprocessing全局变量也是共享的
 
thread多线程: 全局变量共享
    

 验证方法

 
用ab命令模拟大量并发,很容易测试出来
 
ab -n 1000 -c 100 -p ./post.txt -T application/json http://xxxx:5000/xxx
 

 结论

 
1. celery如果访问数据库, gpu等资源, 不用担心多次加载
 
2. 注意: 如果在task中初始化全局变量, 初始化较慢, 同时又收到大量task请求,可能会导致初始化多次
    
 
 

Tags: celery
评论(128) 阅读(748)

protobuf序列化numpy

2020年6月15日 15:02

 

说明

 
protobuf处理不能直接处理numpy,需要先把numpy转为字节
 

 numpy转字节

 
import numpy as np
from io import BytesIO
A = np.array([ 
      1, 2, 3, 4, 4,
      2, 3, 4, 5, 3,
      4, 5, 6, 7, 2,
      5, 6, 7, 8, 9,
      6, 7, 8, 9, 0 ]).reshape(5,5)

   
# numpy 转bytes
nda_bytes = BytesIO()
np.save(nda_bytes, A, allow_pickle=False)

# bytes转numpy
nda_bytes = BytesIO(nda_bytes.getvalue())
B = np.load(nda_bytes, allow_pickle=False)
print(np.array_equal(A, B))
 

定义protobuf message

 
ndarray.proto
 
syntax = "proto3";

message NDArray {
    bytes ndarray = 1;
}
 

使用

 
from io import BytesIO
import numpy as np
import ndarray_pb2   #上面ndarray.proto编译成python


def ndarray_to_proto(nda: np.ndarray) -> NDArray:
    """
    numpy转proto
    """
    nda_bytes = BytesIO()
    np.save(nda_bytes, nda, allow_pickle=False)
    return NDArray(ndarray=nda_bytes.getvalue())


def proto_to_ndarray(nda_proto: NDArray) -> np.ndarray:
    nda_bytes = BytesIO(nda_proto.ndarray)
    return np.load(nda_bytes, allow_pickle=False)



A = np.array([ 
      1, 2, 3, 4, 4,
      2, 3, 4, 5, 3,
      4, 5, 6, 7, 2,
      5, 6, 7, 8, 9,
      6, 7, 8, 9, 0 ]).reshape(5,5)
serialized_A = ndarray_to_proto(A)
deserialized_A = proto_to_ndarray(serialized_A)
assert np.array_equal(A, deserialized_A)
 
 
 

Tags: protobuf
评论(163) 阅读(881)