eventlet绿化和patch原理

2020年11月21日 12:09

说明

 
eventlet是一个必备工具,经常用,绿化原理有点忘记了,重新复习一遍.
 
 

三个主要问题

 
1. 绿化的原理是什么?
 
2. 绿化怎么管理?
 
3. 绿化怎么引入?
 

绿化原理

 
利用select/epolls/kqueue等操作系统提供的非阻塞操作,将阻塞改为非阻塞.
 

引用管理

 
eventlet在import之后,将模块中的属性绿化.
 
用一小段代码来查看看
import sys
import eventlet
# eventlet.monkey_patch()

httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

print("================")
for k,v in sys.modules.items():
    if "socket" in k:
        print(k, v)


# 打印内容
<module 'httplib2' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\xxx\\lib\\site-packages\\eventlet\\green\\socket.py'>
================
_socket <module '_socket' from 'c:\\python\\python36\\DLLs\\_socket.pyd'>
socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>
__original_module_socket <module 'socket' from 'c:\\python\\python36\\lib\\socket.py'>

# 说明了什么?
绿化只是替换httplib2模块中的引用属性socket。并未改变sys.modules中的属性
 

引入绿化方法一:直接import

 
from eventlet.green import socket
 

引入绿化方法二:eventlet.import_patch

 
1.好处: 能绿化模块内部的系统模块
2.只能绿化os, select, socket, thread, time, psycopy, MySQLdb, bultines, subprocess
3.如果是import threading,不会自动绿化, 需要from eventlet.green import threading
 
import eventlet
httplib2 = eventlet.import_patched('httplib2')
print(httplib2)
print(httplib2.socket)

# 打印
<module 'httplib2' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\httplib2\\__init__.py'>
<module 'eventlet.green.socket' from 'D:\\workspace\\venv\\autumn-secs\\lib\\site-packages\\eventlet\\green\\socket.py'>
 

引入绿化方法三:eventlet.monkey_patch

 
1.与import_patch不同,会直接修改sys.modules
2.但是也只能绿化os,select等等几个模块,其它模块需要用直接引用绿化
 
 
 

Tags: eventlet
评论(2) 阅读(98)

实用工具状态机transitions

2020年11月21日 09:42

说明

 
1. 状态机是一个非常实用的理论。在涉及到复杂的场景,建立状态机模型,能带来极大的方便。比如,网络连接、模型状态、业务逻辑。
 
2. 状态机并不复杂, 重要的是它的思想,能够极大减轻复杂度。使用时关键在于定义好事件和动作。
 
 

 基本概念

 
State: 状态
Event: 事件. 事件触发状态变换
Action: 动作. event发生前或后执行的动作
transition: 变换. 状态变换
 

github

 
https://github.com/pytransitions/transitions
 

安装

 
pip install transitions
 

简单示例

# 连接协议状态机
from transitions.extensions import HierarchicalMachine as Machine
from transitions.extensions.nesting import NestedState

class ConnectionStateMachine:
    """Connection state machine."""

    def __init__(self, callbacks=None):
        """
        :param callbacks: callbacks for the state machine
        """
        self.callbacks = {}

        # 定义状态
        self.states = ["STATE_NOT_CONNECTED",
                       {
                            'name': "STATE_CONNECTED",   # 状态名
                            'on_enter': self._on_enter_CONNECTED,  # 进入状态触发
                            'on_exit': self._on_exit_CONNECTED,    # 退出状态触发
                            'children': [     # 状态嵌套
                                "STATE_NOT_SELECTED",
                                {
                                    'name': "STATE_SELECTED",
                                    'on_enter': self._on_enter_CONNECTED_SELECTED
                                }
                            ]
                       }]

        # transition 1
        self.machine = Machine(model=self, states=self.states, initial="STATE_NOT_CONNECTED", auto_transitions=False)

        if callbacks:
            self.callbacks = callbacks

        # 定义状态变换
        self.machine.add_transition('connect', "STATE_NOT_CONNECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 2
        self.machine.add_transition('disconnect', "STATE_CONNECTED", "STATE_NOT_CONNECTED")  # transition 3
        self.machine.add_transition('select', "STATE_CONNECTED_NOT_SELECTED", "STATE_CONNECTED_SELECTED")  # transition 4
        self.machine.add_transition('deselect', "STATE_CONNECTED_SELECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 5
        self.machine.add_transition('timeoutT7', "STATE_CONNECTED_NOT_SELECTED", "STATE_NOT_CONNECTED")  # transition 6

    # 事件触发的动作
    def _on_enter_CONNECTED(self):
        if "on_enter_CONNECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED"]()

    def _on_exit_CONNECTED(self):
        if "on_exit_CONNECTED" in self.callbacks:
            self.callbacks["on_exit_CONNECTED"]()

    def _on_enter_CONNECTED_SELECTED(self):
        if "on_enter_CONNECTED_SELECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED_SELECTED"]()
 
 
 

定义状态机

 
# 一般都是两种:不嵌套、嵌套
from transitions import Machine   #不嵌套
from transitions.extensions import HierarchicalMachine as Machine # 嵌套

# 检查状态
machine.state
machine.is_<state_name>()
 

定义状态

# 状态可以是三种类型: 对象、字符串、字典
from transitions import State
states = [
    State(name="solid"),    # 对象
    "liquid",               # 字符串
    {"name": "gas"}         # 字典
]
 
 

定义变换

 
# 定义函数
machine.add_transition(trigger, source, dest, )
trigger(str)         方法名,触发状态转换
source(str or list)  源状态
dest(str)            目标状态

# 加上变换
machine.add_transition(trigger="wake_up", source="asleep", dest="hanging out")
machine.add_transition('work_out', 'hanging out', 'hungry')
machine.add_transition('nap', '*', 'asleep')   # 从任意状态变为asleep
 
 

 

Tags: 状态机
评论(2) 阅读(92)

eventlet并发读写socket出现Second simultaneous问题

2020年7月08日 10:50

 

描述

 
celery+eventlet实现任务调用时出现RuntimeError: Second simultaneous read on fileno 14 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket
 
 

模拟错误

 
网上找的一段代码,模拟出同样问题
 
def main():
    import eventlet
    httplib2 = eventlet.import_patched('httplib2')
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

错误内容

 
Traceback (most recent call last):
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenthread.py", line 221, in main
    result = function(*args, **kwargs)
  File "test.py", line 12, in get_url
    resp, content = shared_resource.request("http://eventlet.net")
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1994, in request
    cachekey,
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1651, in _request
    conn, request_uri, method, body, headers
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1558, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 403, in sendall
    tail = self.send(data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 397, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 392, in _send_loop
    timeout_exc=_timeout_exc)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 210, in _trampoline
    mark_as_closed=self._mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/__init__.py", line 157, in trampoline
    listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/epolls.py", line 22, in add
    listener = hub.BaseHub.add(self, evtype, fileno, cb, tb, mac)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/hub.py", line 181, in add
    evtype, fileno, evtype, cb, bucket[fileno]))
RuntimeError: Second simultaneous write on fileno 5 detected.  Unless you really know what you're doing, make sure that only one greenthread can write any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of GreenThread object at 0x7f25df161680>; THAT THREAD=FdListener('write', 5, <built-in method switch of GreenThread object at 0x7f25df161470>, <built-in method throw of GreenThread object at 0x7f25df161470>)
 

 解决方法

 
猜测可能是没绿化完整,改成如下,果然就正常了。
 
def main():
    import eventlet
#     httplib2 = eventlet.import_patched('httplib2')
    import http2
    eventlet.monkey_patch()
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

结论

  • 局部import的模块,可能使用了未被绿化的模块,导出eventlet出现问题
  • 延迟加载的模块配合eventlet也会出现同样问题
 

 

Tags: eventlet
评论(29) 阅读(440)

celery配合redis出现redis.exceptions.InvalidResponse Protocol Error

2020年6月21日 09:29

Tags: Redis Celery
评论(33) 阅读(593)

celery变量共享

2020年6月21日 09:19

问题

 
很多情况下我们想让task共享变量,该怎么做?
 

celery的并发原理

 
celery的并发任务池,有eventlet, gevent, prefork, thread类型
 
eventlet/gevent协程: 只有一个进程一个线程, 全局变量在task之间共享
    
prefork属于multiprocessing: multiprocessing全局变量也是共享的
 
thread多线程: 全局变量共享
    

 验证方法

 
用ab命令模拟大量并发,很容易测试出来
 
ab -n 1000 -c 100 -p ./post.txt -T application/json http://xxxx:5000/xxx
 

 结论

 
1. celery如果访问数据库, gpu等资源, 不用担心多次加载
 
2. 注意: 如果在task中初始化全局变量, 初始化较慢, 同时又收到大量task请求,可能会导致初始化多次
    
 
 

Tags: celery
评论(76) 阅读(566)

protobuf序列化numpy

2020年6月15日 15:02

 

说明

 
protobuf处理不能直接处理numpy,需要先把numpy转为字节
 

 numpy转字节

 
import numpy as np
from io import BytesIO
A = np.array([ 
      1, 2, 3, 4, 4,
      2, 3, 4, 5, 3,
      4, 5, 6, 7, 2,
      5, 6, 7, 8, 9,
      6, 7, 8, 9, 0 ]).reshape(5,5)

   
# numpy 转bytes
nda_bytes = BytesIO()
np.save(nda_bytes, A, allow_pickle=False)

# bytes转numpy
nda_bytes = BytesIO(nda_bytes.getvalue())
B = np.load(nda_bytes, allow_pickle=False)
print(np.array_equal(A, B))
 

定义protobuf message

 
ndarray.proto
 
syntax = "proto3";

message NDArray {
    bytes ndarray = 1;
}
 

使用

 
from io import BytesIO
import numpy as np
import ndarray_pb2   #上面ndarray.proto编译成python


def ndarray_to_proto(nda: np.ndarray) -> NDArray:
    """
    numpy转proto
    """
    nda_bytes = BytesIO()
    np.save(nda_bytes, nda, allow_pickle=False)
    return NDArray(ndarray=nda_bytes.getvalue())


def proto_to_ndarray(nda_proto: NDArray) -> np.ndarray:
    nda_bytes = BytesIO(nda_proto.ndarray)
    return np.load(nda_bytes, allow_pickle=False)



A = np.array([ 
      1, 2, 3, 4, 4,
      2, 3, 4, 5, 3,
      4, 5, 6, 7, 2,
      5, 6, 7, 8, 9,
      6, 7, 8, 9, 0 ]).reshape(5,5)
serialized_A = ndarray_to_proto(A)
deserialized_A = proto_to_ndarray(serialized_A)
assert np.array_equal(A, deserialized_A)
 
 
 

Tags: protobuf
评论(77) 阅读(671)

socketio与apscheduler并用

2019年4月28日 10:16

 

 说明

 
flask项目引入了flask-socketio提供websocket通信,同时需要flask-apscheduler完成定时任务。
 
 

 问题描述

 
项目初期只有socketio,并且以gunicron运行
 
gunicorn --worker-class eventlet -w 1 zhima_chat:app -b 0.0.0.0:5000 --access-logfile -
 
后来要引入apscheduler
 
以上面的方式运行,出现了问题。该如何将socketio与apschedeuler结合呢?
 
 

Tags: flask apscheduler python socketio
评论(0) 阅读(26372)

docker运行gunicorn看不到控制台输出

2019年4月06日 07:59

问题描述

 
项目做成了docker镜像, 以gunicorn命令运行, print输出为什么没有写到docker日志?
 
<!--more-->
 
* docker-compose文件如下
 
version: '2'
services:
  xxx.xxx:
    image: xxx:1.0
    volumes:
        - /usr/share/zoneinfo/Asia/Shanghai:/etc/localtime
        - ./log/xxx:/var/log/xxx
        - ./xxx:/app
    ports:
      - "5000:5000"
    environment:
      - FLASK_ENV=development_wyq
    command: gunicorn --worker-class eventlet -w 1 xxx:app -b 0.0.0.0:5000      # print不写到控制台
    #command: gunicorn --worker-class eventlet -w 1 xxx:app -b 0.0.0.0:5000 --access-logfile -   #print写到控制台
 
 

解决方法

 
在命令上加上"--access-logfile -"
 
来源 
 
此生必看的科学实验-水知道答案
《了凡四训》详解之改过之法
印光大师十念法(胡小林主讲第1集)
精神病为什么治不好
百病之源
 

Tags: docker flask websocket gunicorn
评论(1) 阅读(4326)

利用qq邮箱作为个人邮件服务器发送邮件

2019年4月06日 07:47

 需求描述
    邮件通知很有用处。如何搭建一个邮件服务器,允许发送邮件呢?
 

实现方法

 
一、利用开源软件,搭建自己的邮件服务器。
二、利用qq等第三方邮件服务。
 
自己搭邮件服务器较麻烦,我的需求比较简单,利用qq邮件,发送邮件即可,下面是方法.
 
 

Tags: 运维
评论(32) 阅读(1533)

用最简单方法解决api接口安全问题,几乎无法破解

2019年1月01日 11:45

 

 场景描述

 
项目需要为第三方提供api服务接口。接口涉及到核心功能,如何保证接口安全。防止伪造身份、篡改数据?
 

思路

 
保障数据安全最好的方法,当然是加密了。无法解析内容,自然无法伪造,篡改。

可是使用https证书需要收费的。有其它方法么?

有的。

消息哈希认证(hmac)。
 
 

 算法描述

 
*  访问者
 
1. 当访问接口时, 将参数按key值排序,组成key1=value1&key2=value2&....&secret_key=...
2. 然后对上面结果做md5,生成签名sign
3. 将sign放到加入请求的参数
 
* 被访问者
 
密钥是被访问者提供了,它也有访问者的secret_key.
1.根据app_id查到secret_key
2.处理请求参数,按规则组成key1=value1&key2=value2....&secret_key=...
3.对上一步结果做md5,生成sign。比较两个sign,相等则身份验证通过
 

效果

 
1. 密钥只存在双方机器上,不可能被截取
2. 签名无法伪造,同样身份无法伪造、消息无法被篡改
 
使用了hmac认证,接口被破解基本是不可能的
 

python实现

 
import md5

app_id=123
secret_key="xxxxxxxx"

request_param = dict(
    key1="value1",
    key2="value2",
    key3="value3"
)

def sign():
    params = ["%s=%s" % (key, value) for key, value in sorted(request_param.items(), key=lambda item: item[0])]
    params.append("secret_key=%s" % secret_key)
    str_param = "&".join(params)
    print str_param
    md = md5.md5()
    md.update(str_param)
    return md.hexdigest()

if __name__ == '__main__':
    print(sign())
 
 
来源
 

 

Tags: web python
评论(30) 阅读(4004)