在Python中构建WebSocket服务器和客户端

1. 安装

pip install websockets

2. 基本的例子

这是一个WebSocket服务器示例。

它从客户端读取名称,发送问候语并关闭连接。

#!/usr/bin/env python

# WS server example

import asyncio
import websockets

async def hello(websocket, path):
    name = await websocket.recv()
    print(f"< {name}")

    greeting = f"Hello {name}!"

    await websocket.send(greeting)
    print(f"> {greeting}")

start_server = websockets.serve(hello, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在服务器端, 为每个WebSocket连接websockets执行hello一次处理程序协同程序。它在处理程序协程返回时关闭连接。

这是一个相应的WebSocket客户端示例。

#!/usr/bin/env python

# WS client example

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        name = input("What's your name? ")

        await websocket.send(name)
        print(f"> {name}")

        greeting = await websocket.recv()
        print(f"< {greeting}")

asyncio.get_event_loop().run_until_complete(hello())

使用connect()异步上下文管理器可确保在退出hello协程之前关闭连接。

3. 安全示例

安全的WebSocket连接可以提高机密性和可靠性,因为它们可以降低不良代理干扰的风险。

WSS协议是WS到HTTPS的HTTP:连接是使用传输层安全性(TLS)加密的 - 通常称为安全套接字层(SSL)。WSS需要像HTTPS这样的TLS证书。

以下是如何调整服务器示例以提供安全连接。请参阅ssl模块文档以安全地配置上下文。

#!/usr/bin/env python

# WSS (WS over TLS) server example, with a self-signed certificate

import asyncio
import pathlib
import ssl
import websockets

async def hello(websocket, path):
    name = await websocket.recv()
    print(f"< {name}")

    greeting = f"Hello {name}!"

    await websocket.send(greeting)
    print(f"> {greeting}")

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
ssl_context.load_cert_chain(localhost_pem)

start_server = websockets.serve(
    hello, "localhost", 8765, ssl=ssl_context
)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

以下是如何调整客户端。

#!/usr/bin/env python

# WSS (WS over TLS) client example, with a self-signed certificate

import asyncio
import pathlib
import ssl
import websockets

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
ssl_context.load_verify_locations(localhost_pem)

async def hello():
    uri = "wss://localhost:8765"
    async with websockets.connect(
        uri, ssl=ssl_context
    ) as websocket:
        name = input("What's your name? ")

        await websocket.send(name)
        print(f"> {name}")

        greeting = await websocket.recv()
        print(f"< {greeting}")

asyncio.get_event_loop().run_until_complete(hello())

此客户端需要上下文,因为服务器使用自签名证书。

连接到具有有效证书的安全WebSocket服务器的客户端(即由您的Python安装信任的CA签名)可以简单地传递 ssl=True给connect()而不是构建上下文。

4. 基于浏览器的示例

这是一个如何运行WebSocket服务器并从浏览器连接的示例。

在控制台中运行此脚本:

#!/usr/bin/env python

# WS server that sends messages at random intervals

import asyncio
from time import ctime
import random
import websockets


async def time(websocket, path):
    while True:
        now = ctime() + ",本消息通过服务器websocket传输。"
        await websocket.send(now)   # 客户端可能随时关闭,要捕捉异常
        await asyncio.sleep(random.random() * 3)

start_server = websockets.serve(time, "127.0.0.1", 5678)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在 nginx 中配置websocket的反向代理:

    ### We want full access to SSL via backend ###
    location /websocket/ {
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        proxy_http_version 1.1;
        proxy_read_timeout   3600s; # 超时设置

        # 启用支持websocket连接
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_pass       http://127.0.0.1:5678;
    }

然后在浏览器中打开此HTML文件。

<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket 演示</title>
    </head>
    <body>
        <script>
            var ws = new WebSocket("ws://127.0.0.1:5678/"),
                messages = document.createElement('ul');
            ws.onmessage = function (event) {
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode(event.data);
                message.appendChild(content);
                messages.appendChild(message);
            };
            document.body.appendChild(messages);
        </script>
    </body>
</html>

写在Django中:

def webskt(request):
    # websocket 测试。
    html = """
<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket 演示</title>
    </head>
    <body>
        <script>
            var ws = new WebSocket("wss://www.hnbig.cn/websocket/"),
                messages = document.createElement('ul');
            ws.onmessage = function (event) {
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode(event.data);
                message.appendChild(content);
                messages.appendChild(message);
            };
            document.body.appendChild(messages);
        </script>
    </body>
</html>
    """
    return HttpResponse(html)

5. 同步示例

WebSocket服务器可以从客户端接收事件,处理它们以更新应用程序状态,并在客户端之间同步结果状态。

这是一个示例,任何客户端都可以递增或递减计数器。更新将传播到所有连接的客户端。

asyncio保证更新被序列化的并发模型。

在控制台中运行此脚本:

#!/usr/bin/env python

# WS server example that synchronizes state across clients

import asyncio
import json
import logging
import websockets

logging.basicConfig()

STATE = {"value": 0}

USERS = set()


def state_event():
    return json.dumps({"type": "state", **STATE})


def users_event():
    return json.dumps({"type": "users", "count": len(USERS)})


async def notify_state():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = state_event()
        await asyncio.wait([user.send(message) for user in USERS])


async def notify_users():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = users_event()
        await asyncio.wait([user.send(message) for user in USERS])


async def register(websocket):
    USERS.add(websocket)
    await notify_users()


async def unregister(websocket):
    USERS.remove(websocket)
    await notify_users()


async def counter(websocket, path):
    # register(websocket) sends user_event() to websocket
    await register(websocket)
    try:
        await websocket.send(state_event())
        async for message in websocket:
            data = json.loads(message)
            if data["action"] == "minus":
                STATE["value"] -= 1
                await notify_state()
            elif data["action"] == "plus":
                STATE["value"] += 1
                await notify_state()
            else:
                logging.error("unsupported event: {}", data)
    finally:
        await unregister(websocket)


start_server = websockets.serve(counter, "localhost", 6789)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

配置 Nginx 的反向代理:

    ### We want full access to SSL via backend ###
    location /websocket2/ {
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        proxy_http_version 1.1;
        proxy_read_timeout   3600s; # 超时设置

        # 启用支持websocket连接
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_pass       http://127.0.0.1:6789;
    }

然后在几个浏览器中打开此HTML文件。

<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket 演示2</title>
        <style type="text/css">
            body {
                font-family: "Courier New", sans-serif;
                text-align: center;
            }
            .buttons {
                font-size: 4em;
                display: flex;
                justify-content: center;
            }
            .button, .value {
                line-height: 1;
                padding: 2rem;
                margin: 2rem;
                border: medium solid;
                min-height: 1em;
                min-width: 1em;
            }
            .button {
                cursor: pointer;
                user-select: none;
            }
            .minus {
                color: red;
            }
            .plus {
                color: green;
            }
            .value {
                min-width: 2em;
            }
            .state {
                font-size: 2em;
            }
        </style>
    </head>
    <body>
        <div class="buttons">
            <div class="minus button">-</div>
            <div class="value">?</div>
            <div class="plus button">+</div>
        </div>
        <div class="state">
            <span class="users">?</span> online
        </div>
        <script>
            var minus = document.querySelector('.minus'),
                plus = document.querySelector('.plus'),
                value = document.querySelector('.value'),
                users = document.querySelector('.users'),
                websocket = new WebSocket("wss://www.hnbig.cn/websocket2/");
            minus.onclick = function (event) {
                websocket.send(JSON.stringify({action: 'minus'}));
            }
            plus.onclick = function (event) {
                websocket.send(JSON.stringify({action: 'plus'}));
            }
            websocket.onmessage = function (event) {
                data = JSON.parse(event.data);
                switch (data.type) {
                    case 'state':
                        value.textContent = data.value;
                        break;
                    case 'users':
                        users.textContent = (
                            data.count.toString() + " user" +
                            (data.count == 1 ? "" : "s"));
                        break;
                    default:
                        console.error(
                            "unsupported event", data);
                }
            };
        </script>
    </body>
</html>

在Django中添加:

def webskt2(request):
    """ websocket 演示2"""
    html2 = """
<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket 演示2</title>
        <style type="text/css">
            body {
                font-family: "Courier New", sans-serif;
                text-align: center;
            }
            .buttons {
                font-size: 4em;
                display: flex;
                justify-content: center;
            }
            .button, .value {
                line-height: 1;
                padding: 2rem;
                margin: 2rem;
                border: medium solid;
                min-height: 1em;
                min-width: 1em;
            }
            .button {
                cursor: pointer;
                user-select: none;
            }
            .minus {
                color: red;
            }
            .plus {
                color: green;
            }
            .value {
                min-width: 2em;
            }
            .state {
                font-size: 2em;
            }
        </style>
    </head>
    <body>
        <div class="buttons">
            <div class="minus button">-</div>
            <div class="value">?</div>
            <div class="plus button">+</div>
        </div>
        <div class="state">
            <span class="users">?</span> online
        </div>
        <script>
            var minus = document.querySelector('.minus'),
                plus = document.querySelector('.plus'),
                value = document.querySelector('.value'),
                users = document.querySelector('.users'),
                websocket = new WebSocket("wss://www.hnbig.cn/websocket2/");
            minus.onclick = function (event) {
                websocket.send(JSON.stringify({action: 'minus'}));
            }
            plus.onclick = function (event) {
                websocket.send(JSON.stringify({action: 'plus'}));
            }
            websocket.onmessage = function (event) {
                data = JSON.parse(event.data);
                switch (data.type) {
                    case 'state':
                        value.textContent = data.value;
                        break;
                    case 'users':
                        users.textContent = (
                            data.count.toString() + " user" +
                            (data.count == 1 ? "" : "s"));
                        break;
                    default:
                        console.error(
                            "unsupported event", data);
                }
            };
        </script>
    </body>
</html>
    """

    return HttpResponse(html2)

在多个浏览器中打开:https://www.hnbig.cn/polls/webskt2

6. 常见模式

您通常希望在连接的生命周期内处理多条消息。因此,您必须编写一个循环。以下是构建WebSocket服务器的基本模式。

6.1 消费者

用于接收消息并将它们传递给consumer协程:

async def consumer_handler(websocket, path):
    async for message in websocket:
        await consumer(message)

在此示例中,consumer表示用于处理在WebSocket连接上接收的消息的业务逻辑。

当客户端断开连接时,迭代终止。

6.2 制片人

从producer协程获取消息并发送它们:

async def producer_handler(websocket, path):
    while True:
        message = await producer()
        await websocket.send(message)

在此示例中,producer表示用于生成要在WebSocket连接上发送的消息的业务逻辑。

send()ConnectionClosed当客户端断开连接时会引发 异常,这会导致循环中断。while True

6.3 两者

您可以通过组合上面显示的两种模式并并行运行这两个任务来在同一连接上读取和写入消息:

async def handler(websocket, path):
    consumer_task = asyncio.ensure_future(
        consumer_handler(websocket, path))
    producer_task = asyncio.ensure_future(
        producer_handler(websocket, path))
    done, pending = await asyncio.wait(
        [consumer_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED,
    )
    for task in pending:
        task.cancel()

6.4 注册

如上面的同步示例所示,如果需要维护当前连接的客户端列表,则必须在连接时注册它们,并在断开连接时取消注册。

connected = set()

async def handler(websocket, path):
    # Register.
    connected.add(websocket)
    try:
        # Implement logic here.
        await asyncio.wait([ws.send("Hello!") for ws in connected])
        await asyncio.sleep(10)
    finally:
        # Unregister.
        connected.remove(websocket)

这个简单的示例跟踪内存中连接的客户端。这只有在您运行单个进程时才有效。在实际应用中,处理程序可以订阅消息代理上的某些频道。

7. 就这样!

websocketsAPI 的设计由简单性驱动。

您不必担心执行打开或关闭握手,回答ping或规范要求的任何其他行为。

websockets 在引擎盖下处理所有这些,所以你不必。

8. 还有一件事......

websockets 提供交互式客户端:

$ python -m websockets wss://www.hnbig.cn/websocket2/


9. 备忘单

9.1. 服务器端

  • 编写一个处理单个连接的协同程序。它接收WebSocket协议实例和参数中的URI路径。
    • 随时拨打电话recv(), send()接收和发送信息。
    • 何时recv()或 send()加注 ConnectionClosed,清理并退出。如果您启动了其他asyncio.Task,请在退出之前终止它们。
    • 如果您还在等待recv(),请考虑等待wait_closed() 连接关闭时快速检测。
    • 您可以ping()或 pong()如果您愿意,但一般不需要它。
  • 创建一个与serve()asyncio类似 的服务器create_server()。您还可以将其用作异步上下文管理器。
    • 服务器负责建立连接,然后让处理程序执行应用程序逻辑,最后在处理程序正常退出或异常后关闭连接。
    • 对于高级自定义,您可以子类化 WebSocketServerProtocol并传递此子类或工厂函数作为create_protocol参数。

9.2. 客户端

  • 创建一个与connect()asyncio类似 的客户端create_connection()。您还可以将其用作异步上下文管理器。
    • 对于高级自定义,您可以子类化 WebSocketClientProtocol并传递此子类或工厂函数作为create_protocol参数。
    • 随时拨打电话recv(), send()接收和发送信息。
    • 您可以ping()或 pong()如果您愿意,但一般不需要它。
    • 如果您不使用connect()上下文管理器,请调用 close()以终止连接。

9.3. 调试

如果您不明白websockets正在做什么,请启用日志记录:

import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

日志包含:
- ERROR级别的连接处理程序中的异常
- 在该INFO级别的开启或结束握手中的例外情况
- DEBUG级别上的所有帧- 这可能非常冗长

如果您是新手asyncio,您肯定会遇到与异步编程有关的问题,而不是websockets 特别是。幸运的是,Python的官方文档提供了使用asyncio进行开发的建议 。看看它:它是无价的!

9.4. 将其他参数传递给连接处理程序

编写服务器时,如果需要将其他参数传递给连接处理程序,可以将它们绑定到functools.partial():

import asyncio
import functools
import websockets

async def handler(websocket, path, extra_argument):
    ...

bound_handler = functools.partial(handler, extra_argument='spam')
start_server = websockets.serve(bound_handler, '127.0.0.1', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

实现此结果的另一种方法是handler在extra_argument变量存在的范围内定义协程,而不是通过参数注入它。

更多参考 :

  1. Python websockets.connect() Examples
  2. WebSockets - Full Stack Python
  3. How to Create a Simple Python WebSocket Server Using Tornado
点赞

发表回复

电子邮件地址不会被公开。必填项已用 * 标注