eventlet如何绿化pyserial最好

2021年3月24日 19:19

 

问题

 
pyserial访问windows中的串口,如何绿化?不阻塞协程? 效率最高? 绿化毫无疑问要借用tpool.execute.
 
代码该如何实现才最佳呢?
 

方法一: 直接recv

 
直接recv,会阻塞协程
 
缺点: 协程被阻塞,程序不可用
 

方法二: tpool+in_waiting判断

 
import serial
from eventlet import tpool
class SerialSocket(serial.Serial):
    def recv(self):
        # 读完
        if self.in_waiting:
            return tpool.execute(self.read, self.in_waiting)
        return data

def listen_message():
    ss = SerialSocket("com10")
    buffer = b''
    while True:
        data = ss.recv()
        if data is not None:
            buffer +=data
 
缺点: cpu利用率较高
 

方法三: tpool+in_waiting判断+sleep

 
import serial
from eventlet import tpool, sleep
class SerialSocket(serial.Serial):
    def recv(self):
        # 读完
        if self.in_waiting:
            return tpool.execute(self.read, self.in_waiting)
        return data

def listen_message():
    ss = SerialSocket("com10")
    buffer = b''
    while True:
        data = ss.recv()
        if data is not None:
            buffer +=data
        else:
            sleep(0.01)
 
缺点: 满足大部分需求, 对时间要求比较敏感的程序,还是不行,多了0.01秒
 

方法四(完美):tpool+in_waiting判断+read(1)阻塞

 
利用serial的read()阻塞特性,加上tpool线程,最终线程阻塞转换成协程阻塞
 
import serial
from eventlet import tpool

class SerialSocket(serial.Serial):
    def recv(self):
        # 等待
        data = tpool.execute(self.read, 1)
        # 读完
        while self.in_waiting:
            data += tpool.execute(self.read, self.in_waiting)
        return data

def listen_message():
    ss = SerialSocket("com10")
    buffer = b''
    while True:
        g = evenlet.spawn(ss.recv)
        data = g.wait()
        buffer +=data
 
 

Tags: eventlet pyserial
评论(0) 阅读(1107)

eventlet.monkey_patch是否影响threading

2021年3月24日 13:42

说明

 
evenlet.monkey_patch()默认会绿化thread,并未看到绿化threading,为什么实际中threading被绿化了,如何证明? 测试代码如下
 

修改evenlet,加上标记语句

 
修改eventlet.green.thread.py的start_new_thread方法,加一条打印语句
 
def start_new_thread(function, args=(), kwargs=None):
    ...
    print("hello: ", locals())
    g = greenthread.spawn_n(__thread_body, function, args, kwargs)
    return get_ident(g)
 
 

测试代码

 
# -*- coding:utf-8 -*-
# from eventlet import patcher
# original_threading = patcher.original('threading')
import threading as original_threading
import time
import eventlet
eventlet.monkey_patch()


def thread1_run():
    while True:
        print("thread1:", time.time())
        time.sleep(1)


if __name__ == "__main__":
    thread1 = original_threading.Thread(target=thread1_run, name="thread1", daemon=True)
    print(type(thread1))
    thread1.start()

    e = eventlet.Event()
    e.wait()
 

 

Tags: eventlet
评论(40) 阅读(1665)

eventlet模块中select出现ValueError错误

2021年1月02日 10:34

 

问题说明

 
如果socket句柄是-1, select.select()在判断句柄状态时会报错 ValueError: file descriptor cannot be a negative integer (-1)
 
而eventlet的select模块没有处理这种异常
 

出现环境

 
windows、python3.8、eventlet 0.26 在使用eventlet开发socket程序时, 如果客户端强制中断连接,会出现如下错误
 

错误描述

 
exception: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
...
ValueError: file descriptor cannot be a negative integer (-1)
 
### 解决办法
 
修改文件eventlet\hubs\selects.py,然后把eventlet重新打包
 
    def wait(self, seconds=None):
        ....
        try:
            r, w, er = select.select(reader_fds, writer_fds, all_fds, seconds)
        # 捕捉异常并移除句柄-1
        except ValueError as e:
            self.remove_descriptor(-1)
            return
        except select.error as e:
            if support.get_errno(e) == errno.EINTR:
                return
            elif support.get_errno(e) in BAD_SOCK:
                self._remove_bad_fds()
                return
            else:
                raise
 

总结

 
1.很奇怪eventlet怎么没有修复这种兼容性错误?
 
2.模拟错误的方法  select.select([-1], [], [], 0)
 
3. socket中断句柄怎么变成了-1,非法的句柄-1又是如何进入eventlet的?
 
 

Tags: eventlet
评论(1) 阅读(4410)

eventlet绿化和patch原理

2020年11月21日 12:09

说明

 
eventlet是一个必备工具,经常用,绿化原理有点忘记了,重新复习一遍.
 
 

三个主要问题

 
1. 绿化的原理是什么?
 
2. 绿化怎么管理?
 
3. 绿化怎么引入?
 

绿化原理

 
利用select/epolls/kqueue等操作系统提供的非阻塞操作,将阻塞改为非阻塞.
 

引用管理

 
eventlet在import之后,将模块中的属性绿化.
 
用一小段代码来查看看
import sys
import eventlet
# eventlet.monkey_patch()

httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

print("================")
for k,v in sys.modules.items():
    if "socket" in k:
        print(k, v)


# 打印内容
<module 'httplib2' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\eventlet\\green\\socket.py'>
================
_socket <module '_socket' from 'c:\\python\\python36\\DLLs\\_socket.pyd'>
socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>
__original_module_socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>

# 说明了什么?
绿化只是替换httplib2模块中的引用属性socket。并未改变sys.modules中的属性
 

引入绿化方法一:直接import

 
from eventlet.green import socket
 

引入绿化方法二:eventlet.import_patch

 
1.好处: 能绿化模块内部的系统模块
2.只能绿化os, select, socket, thread, time, psycopy, MySQLdb, bultines, subprocess
3.如果是import threading,不会自动绿化, 需要from eventlet.green import threading
 
import eventlet
httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

# 打印
<module 'httplib2' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\eventlet\\green\\socket.py'>
 

引入绿化方法三:eventlet.monkey_patch

 
1.与import_patch不同,会直接修改sys.modules
2.但是也只能绿化os,select等等几个模块,其它模块需要用直接引用绿化
 
 
 

Tags: eventlet
评论(16) 阅读(2033)

eventlet并发读写socket出现Second simultaneous问题

2020年7月08日 10:50

 

描述

 
celery+eventlet实现任务调用时出现RuntimeError: Second simultaneous read on fileno 14 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket
 
 

模拟错误

 
网上找的一段代码,模拟出同样问题
 
def main():
    import eventlet
    httplib2 = eventlet.import_patched('httplib2')
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

错误内容

 
Traceback (most recent call last):
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenthread.py", line 221, in main
    result = function(*args, **kwargs)
  File "test.py", line 12, in get_url
    resp, content = shared_resource.request("http://eventlet.net")
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1994, in request
    cachekey,
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1651, in _request
    conn, request_uri, method, body, headers
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1558, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 403, in sendall
    tail = self.send(data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 397, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 392, in _send_loop
    timeout_exc=_timeout_exc)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 210, in _trampoline
    mark_as_closed=self._mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/__init__.py", line 157, in trampoline
    listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/epolls.py", line 22, in add
    listener = hub.BaseHub.add(self, evtype, fileno, cb, tb, mac)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/hub.py", line 181, in add
    evtype, fileno, evtype, cb, bucket[fileno]))
RuntimeError: Second simultaneous write on fileno 5 detected.  Unless you really know what you're doing, make sure that only one greenthread can write any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of GreenThread object at 0x7f25df161680>; THAT THREAD=FdListener('write', 5, <built-in method switch of GreenThread object at 0x7f25df161470>, <built-in method throw of GreenThread object at 0x7f25df161470>)
 

 解决方法

 
猜测可能是没绿化完整,改成如下,果然就正常了。
 
def main():
    import eventlet
#     httplib2 = eventlet.import_patched('httplib2')
    import http2
    eventlet.monkey_patch()
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

结论

  • 局部import的模块,可能使用了未被绿化的模块,导出eventlet出现问题
  • 延迟加载的模块配合eventlet也会出现同样问题
 

 

Tags: eventlet
评论(47) 阅读(3660)

eventlet在ubuntu上出现OSError protocol not found

2020年6月21日 09:24

描述

 
tensorflow的nvidia docker镜像使用ubuntu16.04, ubuntu是精简之后的,有些包可能没有。在上面运行eventlet会出现下面问题
 
 

错误内容

 
Traceback (most recent call last):
  File "/app/defect-client/defect_client/cmd/wafer-worker.py", line 14, in <module>
    import eventlet
  File "/usr/local/lib/python3.6/dist-packages/eventlet/__init__.py", line 10, in <module>
    from eventlet import convenience
  File "/usr/local/lib/python3.6/dist-packages/eventlet/convenience.py", line 7, in <module>
    from eventlet.green import socket
  File "/usr/local/lib/python3.6/dist-packages/eventlet/green/socket.py", line 21, in <module>
    from eventlet.support import greendns
  File "/usr/local/lib/python3.6/dist-packages/eventlet/support/greendns.py", line 69, in <module>
    setattr(dns.rdtypes.IN, pkg, import_patched('dns.rdtypes.IN.' + pkg))
  File "/usr/local/lib/python3.6/dist-packages/eventlet/support/greendns.py", line 59, in import_patched
    return patcher.import_patched(module_name, **modules)
  File "/usr/local/lib/python3.6/dist-packages/eventlet/patcher.py", line 126, in import_patched
    *additional_modules + tuple(kw_additional_modules.items()))
  File "/usr/local/lib/python3.6/dist-packages/eventlet/patcher.py", line 100, in inject
    module = __import__(module_name, {}, {}, module_name.split('.')[:-1])
  File "/usr/local/lib/python3.6/dist-packages/dns/rdtypes/IN/WKS.py", line 25, in <module>
    _proto_tcp = socket.getprotobyname('tcp')
OSError: protocol not found
 

解决办法

 
apt-get -o Dpkg::Options::="--force-confmiss" install --reinstall netbase
 

Tags: eventlet python
评论(48) 阅读(5277)