返回

websocket与协程的前世今生(初步)

简单调试python-websocket通信中发生的协程调用链

首先建议你了解下事件驱动模型

https://www.cnblogs.com/wdliu/p/6890930.html

Python线程、协程探究(3)——协程的调度实现大龙

https://zhuanlan.zhihu.com/p/96969508

python cookbook 11.12 理解事件驱动的IO

https://python3-cookbook.readthedocs.io/zh_CN/latest/c11/p12_understanding_event_driven_io.html

需要注意的是,协程并不是万能的,由于系统事件循环的限制,所以文件IO一般还是使用多线程来执行,具体见:github.com/python/asyn…

协程的关键在于future、task。

与Coroutine只有让步和接收结果不同的是Future除了让步和接收结果功能外,它还是一个只会被动进行事件调用且带有状态的容器,它在初始化时就是Pending状态,这时可以被取消,被设置结果和设置异常。而在被设定对应的操作后,Future会被转化到一个不可逆的对应状态,并通过loop.call_sonn来调用所有注册到本身上的回调函数,同时它带有__iter____await__方法使其可以被awaityield from调用。

Task是Future的子类,除了继承了Future的所有方法,它还多了两个重要的方法__step__wakeup,通过这两个方法赋予了Task调度能力,这是Coroutine和Future没有的。

本次调试的问题在于:“为什么当下面的接受代码结束(break)后,发送代码仍然会陷入阻塞?又是为什么可以将他重新唤醒?”

# socket_start.py
import asyncio
import websockets
import json
from loguru import logger
async def sen_data(websocket, path):
    logger.error("初始化完成")
    while 1:
        data1 = {"value":1}
        data1 = json.dumps(data1).encode('utf-8')
        try:
            await websocket.send(data1)
        except Exception:
            logger.debug("重新启动")
            break
    logger.debug("结束了!")

start_serve1 = websockets.serve(sen_data, "127.0.0.1", 1122, ping_interval=None)
asyncio.get_event_loop().run_until_complete(start_serve1)
asyncio.get_event_loop().run_forever()
# socket_get.py
import json
import asyncio
import websockets
import time
from loguru import logger

async def hello(uri):
    async with websockets.connect(uri, ping_interval=None) as websocket:
        while True:
            recv_text = await websocket.recv()
            print("> {}".format(recv_text))
            print(time.time())
            time.sleep(1)

asyncio.get_event_loop().run_until_complete(
    hello('ws://127.0.0.1:1122'))  # 地址改为自己的地址

为了搞清楚,需要画一个时序状态机:

时序状态机绘制地址:

https://tooltt.com/sequence/

时序绘制代码:

Note right of socket\_start: -> 开始执行程序
Note right of socket\_start: self.\_run\_once()
Note right of socket\_start: (base\_events.py)self.\_selector.select(timeout)
Note right of socket\_start: ([selectors.py](http://selectors.py "selectors.py"))self.*selector.poll(timeout, max\_ev)
Note right of socket\_start: 陷入阻塞
Note left of socket\_get: 开始执行程序 <-
Note left of socket\_get: aenter
Note left of socket\_get: await
Note left of socket\_get: await\_impl\_timeout
Note left of socket\_get: await\_impl
Note left of socket\_get: protocol handshake  start
Note left of socket\_get: write\_http\_request
Note left of socket\_get: self.transport.write(request.encode())
socket\_get->socket\_start: self.transport.write(request.encode())
socket\_get->socket\_start: asyncio/selector\_events.py
socket\_get->socket\_start: self.* sock.send(data)
Note right of socket\_start: 跳出阻塞
Note right of socket\_start: ([selectors.py](http://selectors.py "selectors.py"))for fd, event in fd\_event\_list:
Note right of socket\_start: 之后反复运行self.\_ context.run(self.\_callback, \*self.\_args)

另外的一些细节:

启动sen_data函数相关的协程:

(【socket_start文件】此时应当在asyncio.get_event_loop().run_forever())

【socket_start文件】接收到【socket_get文件的connect后执行如下片段】

connection_made await self.handshake start hand shake !!!!!! read_http_request process_request process_origin process_extensions process_subprotocol write_http_response handshake

此时开始执行协程相关项

reference

Python的可等待对象在Asyncio的作用

https://so1n.me/2022/04/11/python’s_waitable_objects_in_asyncio/index.html

Python Asyncio调度原理()

https://juejin.cn/post/7108367062463938597

如何给所有的async函数添加try/catch?(语法分析词法分析)

https://juejin.cn/post/7155434131831128094

事件循环和协程:从生成器到协程(手写简单的协程实现)

https://zhuanlan.zhihu.com/p/31634491

HTTP、TCP与UDP、Socket与Websocket之间的联系与区别

https://segmentfault.com/a/1190000037620675

Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy