实用工具状态机transitions

2020年11月21日 09:42

说明

 
1. 状态机是一个非常实用的理论。在涉及到复杂的场景,建立状态机模型,能带来极大的方便。比如,网络连接、模型状态、业务逻辑。
 
2. 状态机并不复杂, 重要的是它的思想,能够极大减轻复杂度。使用时关键在于定义好事件和动作。
 
 

 基本概念

 
State: 状态
Event: 事件. 事件触发状态变换
Action: 动作. event发生前或后执行的动作
transition: 变换. 状态变换
 

github

 
https://github.com/pytransitions/transitions
 

安装

 
pip install transitions
 

简单示例

# 连接协议状态机
from transitions.extensions import HierarchicalMachine as Machine
from transitions.extensions.nesting import NestedState

class ConnectionStateMachine:
    """Connection state machine."""

    def __init__(self, callbacks=None):
        """
        :param callbacks: callbacks for the state machine
        """
        self.callbacks = {}

        # 定义状态
        self.states = ["STATE_NOT_CONNECTED",
                       {
                            'name': "STATE_CONNECTED",   # 状态名
                            'on_enter': self._on_enter_CONNECTED,  # 进入状态触发
                            'on_exit': self._on_exit_CONNECTED,    # 退出状态触发
                            'children': [     # 状态嵌套
                                "STATE_NOT_SELECTED",
                                {
                                    'name': "STATE_SELECTED",
                                    'on_enter': self._on_enter_CONNECTED_SELECTED
                                }
                            ]
                       }]

        # transition 1
        self.machine = Machine(model=self, states=self.states, initial="STATE_NOT_CONNECTED", auto_transitions=False)

        if callbacks:
            self.callbacks = callbacks

        # 定义状态变换
        self.machine.add_transition('connect', "STATE_NOT_CONNECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 2
        self.machine.add_transition('disconnect', "STATE_CONNECTED", "STATE_NOT_CONNECTED")  # transition 3
        self.machine.add_transition('select', "STATE_CONNECTED_NOT_SELECTED", "STATE_CONNECTED_SELECTED")  # transition 4
        self.machine.add_transition('deselect', "STATE_CONNECTED_SELECTED", "STATE_CONNECTED_NOT_SELECTED")  # transition 5
        self.machine.add_transition('timeoutT7', "STATE_CONNECTED_NOT_SELECTED", "STATE_NOT_CONNECTED")  # transition 6

    # 事件触发的动作
    def _on_enter_CONNECTED(self):
        if "on_enter_CONNECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED"]()

    def _on_exit_CONNECTED(self):
        if "on_exit_CONNECTED" in self.callbacks:
            self.callbacks["on_exit_CONNECTED"]()

    def _on_enter_CONNECTED_SELECTED(self):
        if "on_enter_CONNECTED_SELECTED" in self.callbacks:
            self.callbacks["on_enter_CONNECTED_SELECTED"]()
 
 
 

定义状态机

 
# 一般都是两种:不嵌套、嵌套
from transitions import Machine   #不嵌套
from transitions.extensions import HierarchicalMachine as Machine # 嵌套

# 检查状态
machine.state
machine.is_<state_name>()
 

定义状态

# 状态可以是三种类型: 对象、字符串、字典
from transitions import State
states = [
    State(name="solid"),    # 对象
    "liquid",               # 字符串
    {"name": "gas"}         # 字典
]
 
 

定义变换

 
# 定义函数
machine.add_transition(trigger, source, dest, )
trigger(str)         方法名,触发状态转换
source(str or list)  源状态
dest(str)            目标状态

# 加上变换
machine.add_transition(trigger="wake_up", source="asleep", dest="hanging out")
machine.add_transition('work_out', 'hanging out', 'hungry')
machine.add_transition('nap', '*', 'asleep')   # 从任意状态变为asleep
 
 

 

Tags: 状态机
评论(62) 阅读(777)

恶念对自己伤害是1000分,对别人的伤害是1分

2020年10月08日 15:09

愤愤不平好几天,报复别人也许就几分钟。
反之,心怀感恩好几天,心情愉快,而报答别人又能又多长时间
 
 
心怀感恩的人很少,更何况成人的世界只有利益,没有好坏。
绝大部分时候,做好事没有好报,甚至相反
 
如果选择做好人又能得到什么?
1.平安。能够降低遇到灾祸的概率
2.善念时刻能带给自己利益。
 
 
所以,有智慧的人一定会选择做好人。
 

 

评论(40) 阅读(929)

eventlet并发读写socket出现Second simultaneous问题

2020年7月08日 10:50

 

描述

 
celery+eventlet实现任务调用时出现RuntimeError: Second simultaneous read on fileno 14 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket
 
 

模拟错误

 
网上找的一段代码,模拟出同样问题
 
def main():
    import eventlet
    httplib2 = eventlet.import_patched('httplib2')
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

错误内容

 
Traceback (most recent call last):
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenthread.py", line 221, in main
    result = function(*args, **kwargs)
  File "test.py", line 12, in get_url
    resp, content = shared_resource.request("http://eventlet.net")
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1994, in request
    cachekey,
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1651, in _request
    conn, request_uri, method, body, headers
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/httplib2/__init__.py", line 1558, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/root/miniconda3/envs/defect/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 403, in sendall
    tail = self.send(data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 397, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 392, in _send_loop
    timeout_exc=_timeout_exc)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/greenio/base.py", line 210, in _trampoline
    mark_as_closed=self._mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/__init__.py", line 157, in trampoline
    listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/epolls.py", line 22, in add
    listener = hub.BaseHub.add(self, evtype, fileno, cb, tb, mac)
  File "/root/miniconda3/envs/defect/lib/python3.7/site-packages/eventlet/hubs/hub.py", line 181, in add
    evtype, fileno, evtype, cb, bucket[fileno]))
RuntimeError: Second simultaneous write on fileno 5 detected.  Unless you really know what you're doing, make sure that only one greenthread can write any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of GreenThread object at 0x7f25df161680>; THAT THREAD=FdListener('write', 5, <built-in method switch of GreenThread object at 0x7f25df161470>, <built-in method throw of GreenThread object at 0x7f25df161470>)
 

 解决方法

 
猜测可能是没绿化完整,改成如下,果然就正常了。
 
def main():
    import eventlet
#     httplib2 = eventlet.import_patched('httplib2')
    import http2
    eventlet.monkey_patch()
    shared_resource = httplib2.Http()

    def get_url():
        resp, content = shared_resource.request("http://eventlet.net")
        return content

    p = eventlet.GreenPile()
    p.spawn(get_url)
    p.spawn(get_url)
    results = list(p)
    assert results[0] == results[1]


if __name__ == "__main__":
    main()
 

结论

  • 局部import的模块,可能使用了未被绿化的模块,导出eventlet出现问题
  • 延迟加载的模块配合eventlet也会出现同样问题
 

 

Tags: eventlet
评论(245) 阅读(2427)

celery配合redis出现redis.exceptions.InvalidResponse Protocol Error

2020年6月21日 09:29

Tags: Redis Celery
评论(453) 阅读(3928)

eventlet在ubuntu上出现OSError protocol not found

2020年6月21日 09:24

描述

 
tensorflow的nvidia docker镜像使用ubuntu16.04, ubuntu是精简之后的,有些包可能没有。在上面运行eventlet会出现下面问题
 
 

错误内容

 
Traceback (most recent call last):
  File "/app/defect-client/defect_client/cmd/wafer-worker.py", line 14, in <module>
    import eventlet
  File "/usr/local/lib/python3.6/dist-packages/eventlet/__init__.py", line 10, in <module>
    from eventlet import convenience
  File "/usr/local/lib/python3.6/dist-packages/eventlet/convenience.py", line 7, in <module>
    from eventlet.green import socket
  File "/usr/local/lib/python3.6/dist-packages/eventlet/green/socket.py", line 21, in <module>
    from eventlet.support import greendns
  File "/usr/local/lib/python3.6/dist-packages/eventlet/support/greendns.py", line 69, in <module>
    setattr(dns.rdtypes.IN, pkg, import_patched('dns.rdtypes.IN.' + pkg))
  File "/usr/local/lib/python3.6/dist-packages/eventlet/support/greendns.py", line 59, in import_patched
    return patcher.import_patched(module_name, **modules)
  File "/usr/local/lib/python3.6/dist-packages/eventlet/patcher.py", line 126, in import_patched
    *additional_modules + tuple(kw_additional_modules.items()))
  File "/usr/local/lib/python3.6/dist-packages/eventlet/patcher.py", line 100, in inject
    module = __import__(module_name, {}, {}, module_name.split('.')[:-1])
  File "/usr/local/lib/python3.6/dist-packages/dns/rdtypes/IN/WKS.py", line 25, in <module>
    _proto_tcp = socket.getprotobyname('tcp')
OSError: protocol not found
 

解决办法

 
apt-get -o Dpkg::Options::="--force-confmiss" install --reinstall netbase
 

Tags: eventlet python
评论(149) 阅读(2319)

celery变量共享

2020年6月21日 09:19

问题

 
很多情况下我们想让task共享变量,该怎么做?
 

celery的并发原理

 
celery的并发任务池,有eventlet, gevent, prefork, thread类型
 
eventlet/gevent协程: 只有一个进程一个线程, 全局变量在task之间共享
    
prefork属于multiprocessing: multiprocessing全局变量也是共享的
 
thread多线程: 全局变量共享
    

 验证方法

 
用ab命令模拟大量并发,很容易测试出来
 
ab -n 1000 -c 100 -p ./post.txt -T application/json http://xxxx:5000/xxx
 

 结论

 
1. celery如果访问数据库, gpu等资源, 不用担心多次加载
 
2. 注意: 如果在task中初始化全局变量, 初始化较慢, 同时又收到大量task请求,可能会导致初始化多次
    
 
 

Tags: celery
评论(277) 阅读(1888)

protobuf序列化numpy

2020年6月15日 15:02

 

说明

 
protobuf处理不能直接处理numpy,需要先把numpy转为字节
 

 numpy转字节

 
import numpy as np
from io import BytesIO
A = np.array([ 
      1, 2, 3, 4, 4,
      2, 3, 4, 5, 3,
      4, 5, 6, 7, 2,
      5, 6, 7, 8, 9,
      6, 7, 8, 9, 0 ]).reshape(5,5)

   
# numpy 转bytes
nda_bytes = BytesIO()
np.save(nda_bytes, A, allow_pickle=False)

# bytes转numpy
nda_bytes = BytesIO(nda_bytes.getvalue())
B = np.load(nda_bytes, allow_pickle=False)
print(np.array_equal(A, B))
 

定义protobuf message

 
ndarray.proto
 
syntax = "proto3";

message NDArray {
    bytes ndarray = 1;
}
 

使用

 
from io import BytesIO
import numpy as np
import ndarray_pb2   #上面ndarray.proto编译成python


def ndarray_to_proto(nda: np.ndarray) -> NDArray:
    """
    numpy转proto
    """
    nda_bytes = BytesIO()
    np.save(nda_bytes, nda, allow_pickle=False)
    return NDArray(ndarray=nda_bytes.getvalue())


def proto_to_ndarray(nda_proto: NDArray) -> np.ndarray:
    nda_bytes = BytesIO(nda_proto.ndarray)
    return np.load(nda_bytes, allow_pickle=False)



A = np.array([ 
      1, 2, 3, 4, 4,
      2, 3, 4, 5, 3,
      4, 5, 6, 7, 2,
      5, 6, 7, 8, 9,
      6, 7, 8, 9, 0 ]).reshape(5,5)
serialized_A = ndarray_to_proto(A)
deserialized_A = proto_to_ndarray(serialized_A)
assert np.array_equal(A, deserialized_A)
 
 
 

Tags: protobuf
评论(465) 阅读(3493)

基于docker搭建cephfs分布式文件系统

2020年5月27日 12:32

 

目的 

 
在一台机器上, 利用多块硬盘, 搭建一个cephfs文件系统. 具体来说就是1个mon, 1个mds, 1个mgr, 3个osd
 

注意

 
a. 使用vmware会很方便
 
b. 安装过程中会遇到很多问题,我都没有记录, 尽量安装下面步骤
 

环境准备

 
a. vmware虚拟机fedora30
 
b. 添加3块虚拟机硬盘 /dev/sdb  /dev/sdc /dev/sdd (osd最少需要3个,需要有3块磁盘)
 
c. ceph容器版本 ceph/daemon:latest-luminous
 
 

搭建步骤

 
1. 下载镜像
 
docker pull ceph/daemon:latest-luminous
 
2. 挂载硬盘
vmware虚拟机添加硬盘很方便, 直接加就可以. fdisk -l 查看硬盘
 
3. 清理硬盘
 
# 格式化
mkfs.xfs /dev/sdb -f
mkfs.xfs /dev/sdc -f
mkfs.xfs /dev/sdd -f

# 如果已经是xfs格式, 上面命令并不能清除已有数据, 需要用zap_device清理
docker run -d --net=host --name=osd0  --rm \
--privileged=true \
-v /dev/:/dev/ \
-e OSD_DEVICE=/dev/sde  \
 ceph/daemon:latest-luminous zap_device
 
4. 准备目录
 
/root/ceph
/root/ceph/etc
/root/ceph/lib
 
 
5. 启动mon (监控节点必需)
 
docker run -d --net=host  --name=mon \
-v /root/ceph/etc:/etc/ceph \
-v /root/ceph/lib/:/var/lib/ceph/ \
-e MON_IP=192.168.10.125 \
-e CEPH_PUBLIC_NETWORK=192.168.10.0/24 \
 ceph/daemon:latest-luminous mon
 
6. 启动mgr(可以不用)
 
docker run -d --net=host --name=mgr \
-v /root/ceph/etc:/etc/ceph  \
-v /root/ceph/lib/:/var/lib/ceph  \
ceph/daemon:latest-luminous  mgr
 
7. 启动osd
 
# 修改-name和OSD_DEVICE启动三个osd
docker run -d --net=host --name=osd0   \
--privileged=true \
-v /root/ceph/etc:/etc/ceph  \
-v /root/ceph/lib/:/var/lib/ceph  \
-v /dev/:/dev/ \
-e OSD_DEVICE=/dev/sdb  \
-e OSD_TYPE=disk \
 ceph/daemon:latest-luminous osd
 
8. 启动mds(cephfs系统必需)
 
#  一定要在osd之后创建启动, 因为CEPHFS_CREATE=1会创建cephfs文件系统,受osd数量影响
docker run -d --net=host --name=mds \
-v /root/ceph/etc:/etc/ceph \
-v /root/ceph/lib/:/var/lib/ceph/ \
-e CEPHFS_CREATE=1 \      # 默认创建cephfs文件系统
  ceph/daemon:latest-luminous mds
 
9. 进入mon查看ceph状态
 
# 进入容器
docker exec -it mon bash

# 查看状态
[root@localhost /]# ceph -s
  cluster:
    id:     4d74fd53-84e0-47e6-a06c-5418e4b3b653
    health: HEALTH_WARN
            1 MDSs report slow metadata IOs
            2 osds down
            34/51 objects misplaced (66.667%)
            Reduced data availability: 4 pgs inactive, 16 pgs stale
            Degraded data redundancy: 16 pgs undersized
            too few PGs per OSD (4 < min 30)

  services:
    mon: 1 daemons, quorum localhost
    mgr: localhost(active)
    mds: cephfs-1/1/1 up  {0=localhost=up:creating}
    osd: 5 osds: 2 up, 4 in

  data:
    pools:   2 pools, 16 pgs
    objects: 17 objects, 2.19KiB
    usage:   4.01GiB used, 75.6GiB / 79.6GiB avail
    pgs:     25.000% pgs not active
             34/51 objects misplaced (66.667%)
             12 stale+active+undersized+remapped
             4  stale+undersized+peered
 
10. ceph调参: too few PGs per OSD (4 < min 30)
 
存储池的pg_num, pgp_num太小了, 设置大一点
 
ceph osd pool set cephfs_data pg_num 64
ceph osd pool set cephfs_data pgp_num 64
ceph osd pool set  cephfs_metadata pg_num 32
ceph osd pool set  cephfs_metadata pgp_num 32
 
11. ceph调参: mds: cephfs-1/1/1 up  {0=localhost=up:creating}
 
mds一直处在creating状态, 因为默认I/O需要的最小副本数是2,我们需要调成1
 
ceph osd pool set cephfs_metadata min_size 1
ceph osd pool set cephfs_data min_size 1
 
12. 再看ceph状态, mds状态是active表示cephfs搭建好了
 
mds: cephfs-1/1/1 up  {0=localhost=up:active}
 
13. 挂载cephfs目录(直接mount)
 
# 获取key
cat /root/ceph/etc/ceph.client.admin.keyring
# 直接挂载
mount -t ceph 192.168.10.125:6789:/ /root/abc -o name=admin,secret=AQAvoctebqeuBRAAp+FoatmQ5CUlSlo8dmvGAg==
# 取消挂载
umount /root/abc
 
14. 挂载cephfs目录(ceph-fuse)
 
# 安装ceph-fuse
yum install ceph-fuse

# 挂载(-k指定key -c表示配置文件)
ceph-fuse -m 192.168.10.125:6789 /root/abc1 -k /root/ceph/etc/ceph.client.admin.keyring  -c /root/ceph/etc/ceph.conf

#取消挂载
umount /root/abc1
 
15. 查看结果
 
df -h

192.168.10.125:6789:/                     18G     0   18G    0% /root/abc
ceph-fuse                                 18G     0   18G    0% /root/abc1
 

 

Tags: Ceph
评论(245) 阅读(8591)

tensorflow资源耗净 Resource exhausted OOM when allocating tensor with shape

2020年5月01日 13:02

描述

 
tensorflow跑训练集经常会遇到错误Resource exhausted: OOM when allocating tensor with shape[64,33,33,2048]
 

错误内容

tensorflow.python.framework.errors_impl.ResourceExhaustedError: 2 root error(s) found.
  (0) Resource exhausted: OOM when allocating tensor with shape[64,33,33,2048] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc
[[node SecondStageBoxPredictor_1/ResizeBilinear (defined at /app/models/research/object_detection/predictors/heads/mask_head.py:149) ]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.

[[total_loss/_7771]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.

  (1) Resource exhausted: OOM when allocating tensor with shape[64,33,33,2048] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc
[[node SecondStageBoxPredictor_1/ResizeBilinear (defined at /app/models/research/object_detection/predictors/heads/mask_head.py:149) ]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.

0 successful operations.
0 derived errors ignored.

Errors may have originated from an input operation.
Input Source operations connected to node SecondStageBoxPredictor_1/ResizeBilinear:
 SecondStageFeatureExtractor/resnet_v1_101/block4/unit_3/bottleneck_v1/Relu (defined at /app/models/research/slim/nets/resnet_v1.py:136)

Input Source operations connected to node SecondStageBoxPredictor_1/ResizeBilinear:
 SecondStageFeatureExtractor/resnet_v1_101/block4/unit_3/bottleneck_v1/Relu (defined at /app/models/research/slim/nets/resnet_v1.py:136)
 

原因

 
tensorflow在为张量shape[64,33,33,2048]分配gpu内存是发现资源不够。
 
假如数据类型是int8, 该张量需要的内存大小64*33*33*2048*1B=142737408B = 142.7MB
 

解决方法

 
1. 降低图片质量
2. batch_size改成1
3. 改用大内存的显卡
4. 增加显卡, 并行训练
 

 

Tags: tensorflow nvidia
评论(45) 阅读(4461)

tesla t4的坑Unable to load the kernel module 'nvidia.ko'.ipynb

2020年4月24日 15:35

说明

 
安nvidia tesla T4显卡遇到的坑, 在ubuntu16.04上安装t4会遇到下面错误
 

错误内容

 
   make[1]: Leaving directory '/usr/src/linux-headers-4.4.0-142-generic'
-> done.
-> Kernel module compilation complete.
ERROR: Unable to load the kernel module 'nvidia.ko'.  This happens most frequently when this kernel module was built against the wrong or improperly configured kernel sources, with a version of gcc that differs from the one used to build the target kernel, or if another driver, such as nouveau, is present and prevents the NVIDIA kernel module from obtaining ownership of the NVIDIA GPU(s), or no NVIDIA GPU installed in this system is supported by this NVIDIA Linux graphics driver release.
 

解决方法

 
t4不支持普通服务器,更换成刀片服务器
 

补充

 
  • 如果普通主机操作系统是win10,插上t4,安装驱动正常。
 
  • 安装nvidia 2080Ti 驱动,如果忘记插显卡电源线,会提示同样错误
 
 
 

Tags: tensorflow
评论(952) 阅读(15747)