Skip to content

websocket

The websocket proxy lib.

SUPPORTED_WS_HTTP_VERSIONS = ('1.1') module-attribute

The http versions that we supported now. It depends on httpx.

BaseWebSocketProxy

Bases: BaseProxyModel

Websocket proxy base class.

Attributes:

Name Type Description
client AsyncClient

The httpx.AsyncClient to establish websocket connection.

follow_redirects bool

Whether follow redirects of target server.

max_message_size_bytes int
queue_size int
keepalive_ping_interval_seconds Union[float, None]
keepalive_ping_timeout_seconds Union[float, None]
Tip

httpx_ws.aconnect_ws

Source code in src/fastapi_proxy_lib/core/websocket.py
class BaseWebSocketProxy(BaseProxyModel):
    """Websocket proxy base class.

    Attributes:
        client: The [`httpx.AsyncClient`](https://www.python-httpx.org/api/#asyncclient) to establish websocket connection.
        follow_redirects: Whether follow redirects of target server.
        max_message_size_bytes: refer to [httpx_ws.aconnect_ws][]
        queue_size: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_interval_seconds: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_timeout_seconds: refer to [httpx_ws.aconnect_ws][]

    Tip:
        [`httpx_ws.aconnect_ws`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.aconnect_ws)
    """

    client: httpx.AsyncClient
    follow_redirects: bool
    max_message_size_bytes: int
    queue_size: int
    keepalive_ping_interval_seconds: Union[float, None]
    keepalive_ping_timeout_seconds: Union[float, None]

    @override
    def __init__(
        self,
        client: Optional[httpx.AsyncClient] = None,
        *,
        follow_redirects: bool = False,
        max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
        queue_size: int = DEFAULT_QUEUE_SIZE,
        keepalive_ping_interval_seconds: Union[
            float, None
        ] = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
        keepalive_ping_timeout_seconds: Union[
            float, None
        ] = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
    ) -> None:
        """Http proxy base class.

        Args:
            client: The `httpx.AsyncClient` to establish websocket connection. Defaults to None.<br>
                If None, will create a new `httpx.AsyncClient`,
                else will use the given `httpx.AsyncClient`.
            follow_redirects: Whether follow redirects of target server. Defaults to False.

            max_message_size_bytes: refer to [httpx_ws.aconnect_ws][]
            queue_size: refer to [httpx_ws.aconnect_ws][]
            keepalive_ping_interval_seconds: refer to [httpx_ws.aconnect_ws][]
            keepalive_ping_timeout_seconds: refer to [httpx_ws.aconnect_ws][]

        Tip:
            [`httpx_ws.aconnect_ws`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.aconnect_ws)
        """
        self.max_message_size_bytes = max_message_size_bytes
        self.queue_size = queue_size
        self.keepalive_ping_interval_seconds = keepalive_ping_interval_seconds
        self.keepalive_ping_timeout_seconds = keepalive_ping_timeout_seconds
        super().__init__(client, follow_redirects=follow_redirects)

    @override
    async def send_request_to_target(  # pyright: ignore [reportIncompatibleMethodOverride]
        self,
        *,
        websocket: starlette_ws.WebSocket,
        target_url: httpx.URL,
    ) -> Union[Literal[False], StarletteResponse]:
        """Establish websocket connection for both client and target_url, then pass messages between them.

        - The http version of request must be in [`SUPPORTED_WS_HTTP_VERSIONS`][fastapi_proxy_lib.core.websocket.SUPPORTED_WS_HTTP_VERSIONS].

        Args:
            websocket: The client websocket requests.
            target_url: The url of target websocket server.

        Returns:
            If the establish websocket connection unsuccessfully:
                - Will call `websocket.close()` to send code `4xx`
                - Then return a `StarletteResponse` from target server
            If the establish websocket connection successfully:
                - Will run forever until the connection is closed. Then return False.
        """
        client = self.client
        follow_redirects = self.follow_redirects
        max_message_size_bytes = self.max_message_size_bytes
        queue_size = self.queue_size
        keepalive_ping_interval_seconds = self.keepalive_ping_interval_seconds
        keepalive_ping_timeout_seconds = self.keepalive_ping_timeout_seconds

        client_request_subprotocols: Union[List[str], None] = (
            _get_client_request_subprotocols(websocket.scope)
        )

        # httpx.stream()
        # refer to: https://www.python-httpx.org/api/#helper-functions
        client_request_headers: "HeaderTypes" = _change_client_header(
            headers=websocket.headers, target_url=target_url
        )
        client_request_params: "QueryParamTypes" = websocket.query_params

        # TODO: 是否可以不检查http版本?
        check_result = check_http_version(websocket.scope, SUPPORTED_WS_HTTP_VERSIONS)
        if check_result is not None:
            # NOTE: return 之前最好关闭websocket
            await websocket.close()
            return check_result

        # DEBUG: 用于调试的记录
        logging.debug(
            "WS: client:%s ; url:%s ; params:%s ; headers:%s",
            websocket.client,
            target_url,
            client_request_params,
            client_request_headers,
        )

        # https://github.com/frankie567/httpx-ws/discussions/11
        # https://docs.python.org/3.12/library/contextlib.html?highlight=asyncexitstack#catching-exceptions-from-enter-methods
        stack = AsyncExitStack()
        try:
            # FIX: https://github.com/WSH032/fastapi-proxy-lib/security/advisories/GHSA-7vwr-g6pm-9hc8
            # time cost: 396 ns ± 3.39 ns
            # 由于这不是原子性的操作,所以不保证一定阻止cookie泄漏
            # 一定能保证修复的方法是通过`_tool.change_necessary_client_header_for_httpx`强制指定优先级最高的cookie头
            client.cookies.clear()

            proxy_ws = await stack.enter_async_context(
                httpx_ws.aconnect_ws(
                    # 这个是httpx_ws类型注解的问题,其实是可以使用httpx.URL的
                    url=target_url,  # pyright: ignore [reportArgumentType]
                    client=client,
                    max_message_size_bytes=max_message_size_bytes,
                    queue_size=queue_size,
                    keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
                    keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
                    subprotocols=client_request_subprotocols,
                    # httpx.stream() params
                    # refer to: https://www.python-httpx.org/api/#helper-functions
                    headers=client_request_headers,
                    params=client_request_params,
                    follow_redirects=follow_redirects,
                )
            )
        except httpx_ws.WebSocketUpgradeError as e:
            # 这个错误是在 httpx.stream 获取到响应后才返回的, 也就是说至少本服务器的网络应该是正常的
            # 且对于反向ws代理来说,本服务器管理者有义务保证与目标服务器的连接是正常的
            # 所以这里既有可能是客户端的错误,或者是目标服务器拒绝了连接
            # TODO: 也有可能是本服务器的未知错误
            proxy_res = e.response

            # NOTE: return 之前最好关闭websocket
            # 不调用websocket.accept就发送关闭请求,uvicorn会自动发送403错误
            await websocket.close()
            # TODO: 连接失败的时候httpx_ws会自己关闭连接,但或许这里显式关闭会更好

            # HACK: 这里的返回的响应其实uvicorn不会处理
            return StreamingResponse(
                content=proxy_res.aiter_raw(),
                status_code=proxy_res.status_code,
                headers=proxy_res.headers,
            )

        # NOTE: 对于反向代理服务器,我们不返回 "任何" "具体的内部" 错误信息给客户端,因为这可能涉及到服务器内部的信息泄露

        # NOTE: 请使用 with 语句来 "保证关闭" AsyncWebSocketSession
        async with stack:
            # TODO: websocket.accept 中还有一个headers参数,但是httpx_ws不支持,考虑发起PR
            # https://github.com/frankie567/httpx-ws/discussions/53

            # FIXME: 调查缺少headers参数是否会引起问题,及是否会影响透明代理的无损转发性
            # https://asgi.readthedocs.io/en/latest/specs/www.html#accept-send-event

            # 这时候如果发生错误,退出时 stack 会自动关闭 httpx_ws 连接,所以这里不需要手动关闭
            await websocket.accept(
                subprotocol=proxy_ws.subprotocol
                # headers=...
            )

            client_to_server_task = asyncio.create_task(
                _wait_client_then_send_to_server(
                    client_ws=websocket,
                    server_ws=proxy_ws,
                ),
                name="client_to_server_task",
            )
            server_to_client_task = asyncio.create_task(
                _wait_server_then_send_to_client(
                    client_ws=websocket,
                    server_ws=proxy_ws,
                ),
                name="server_to_client_task",
            )
            # 保持强引用: https://docs.python.org/zh-cn/3.12/library/asyncio-task.html#creating-tasks
            task_group = _ClientServerProxyTask(
                client_to_server_task=client_to_server_task,
                server_to_client_task=server_to_client_task,
            )

            # NOTE: 考虑这两种情况:
            # 1. 如果一个任务在发送阶段退出:
            #   这意味着对应发送的ws已经关闭或者出错
            #   那么另一个任务很快就会在接收该ws的时候引发异常而退出
            #   很快,最终两个任务都结束
            #   **这时候pending 可能 为空,而done为两个任务**
            # 2. 如果一个任务在接收阶段退出:
            #   这意味着对应接收的ws已经关闭或者发生出错
            #   - 对于另一个任务的发送,可能会在发送的时候引发异常而退出
            #       - 可能指的是: wsproto后端的uvicorn发送消息永远不会出错
            #       - https://github.com/encode/uvicorn/discussions/2137
            #   - 对于另一个任务的接收,可能会等待很久,才能继续进行发送任务而引发异常而退出
            #   **这时候pending一般为一个未结束任务**
            #
            #   因为第二种情况的存在,所以需要用 wait_for 强制让其退出
            #   但考虑到第一种情况,先等它 1s ,看看能否正常退出
            try:
                _, pending = await asyncio.wait(
                    task_group,
                    return_when=asyncio.FIRST_COMPLETED,
                )
                for (
                    pending_task
                ) in pending:  # NOTE: pending 一般为一个未结束任务,或者为空
                    # 开始取消未结束的任务
                    try:
                        await asyncio.wait_for(pending_task, timeout=1)
                    except asyncio.TimeoutError:
                        logging.debug(f"{pending} TimeoutError, it's normal.")
                    except Exception as e:
                        # 取消期间可能另一个ws会发生异常,这个是正常情况,且会被 asyncio.wait_for 传播
                        logging.debug(
                            f"{pending} raise error when being canceled, it's normal. error: {e}"
                        )
            except Exception as e:  # pragma: no cover # 这个是保险分支,通常无法执行
                logging.warning(
                    f"Something wrong, please contact the developer. error: {e}"
                )
                raise
            finally:
                # 无论如何都要关闭两个websocket
                # NOTE: 这时候两个任务都已经结束
                await _close_ws(
                    client_to_server_task=client_to_server_task,
                    server_to_client_task=server_to_client_task,
                    client_ws=websocket,
                    server_ws=proxy_ws,
                )
        return False

    @override
    async def proxy(*_: Any, **__: Any) -> NoReturn:
        """NotImplemented."""
        raise NotImplementedError()

__init__(client=None, *, follow_redirects=False, max_message_size_bytes=DEFAULT_MAX_MESSAGE_SIZE_BYTES, queue_size=DEFAULT_QUEUE_SIZE, keepalive_ping_interval_seconds=DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS, keepalive_ping_timeout_seconds=DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS)

Http proxy base class.

Parameters:

Name Type Description Default
client Optional[AsyncClient]

The httpx.AsyncClient to establish websocket connection. Defaults to None.
If None, will create a new httpx.AsyncClient, else will use the given httpx.AsyncClient.

None
follow_redirects bool

Whether follow redirects of target server. Defaults to False.

False
max_message_size_bytes int DEFAULT_MAX_MESSAGE_SIZE_BYTES
queue_size int DEFAULT_QUEUE_SIZE
keepalive_ping_interval_seconds Union[float, None] DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS
keepalive_ping_timeout_seconds Union[float, None] DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS
Tip

httpx_ws.aconnect_ws

Source code in src/fastapi_proxy_lib/core/websocket.py
@override
def __init__(
    self,
    client: Optional[httpx.AsyncClient] = None,
    *,
    follow_redirects: bool = False,
    max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
    queue_size: int = DEFAULT_QUEUE_SIZE,
    keepalive_ping_interval_seconds: Union[
        float, None
    ] = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
    keepalive_ping_timeout_seconds: Union[
        float, None
    ] = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
) -> None:
    """Http proxy base class.

    Args:
        client: The `httpx.AsyncClient` to establish websocket connection. Defaults to None.<br>
            If None, will create a new `httpx.AsyncClient`,
            else will use the given `httpx.AsyncClient`.
        follow_redirects: Whether follow redirects of target server. Defaults to False.

        max_message_size_bytes: refer to [httpx_ws.aconnect_ws][]
        queue_size: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_interval_seconds: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_timeout_seconds: refer to [httpx_ws.aconnect_ws][]

    Tip:
        [`httpx_ws.aconnect_ws`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.aconnect_ws)
    """
    self.max_message_size_bytes = max_message_size_bytes
    self.queue_size = queue_size
    self.keepalive_ping_interval_seconds = keepalive_ping_interval_seconds
    self.keepalive_ping_timeout_seconds = keepalive_ping_timeout_seconds
    super().__init__(client, follow_redirects=follow_redirects)

proxy(*_, **__) async

NotImplemented.

Source code in src/fastapi_proxy_lib/core/websocket.py
@override
async def proxy(*_: Any, **__: Any) -> NoReturn:
    """NotImplemented."""
    raise NotImplementedError()

send_request_to_target(*, websocket, target_url) async

Establish websocket connection for both client and target_url, then pass messages between them.

Parameters:

Name Type Description Default
websocket WebSocket

The client websocket requests.

required
target_url URL

The url of target websocket server.

required

Returns:

Type Description
Union[Literal[False], Response]

If the establish websocket connection unsuccessfully: - Will call websocket.close() to send code 4xx - Then return a StarletteResponse from target server

Union[Literal[False], Response]

If the establish websocket connection successfully: - Will run forever until the connection is closed. Then return False.

Source code in src/fastapi_proxy_lib/core/websocket.py
@override
async def send_request_to_target(  # pyright: ignore [reportIncompatibleMethodOverride]
    self,
    *,
    websocket: starlette_ws.WebSocket,
    target_url: httpx.URL,
) -> Union[Literal[False], StarletteResponse]:
    """Establish websocket connection for both client and target_url, then pass messages between them.

    - The http version of request must be in [`SUPPORTED_WS_HTTP_VERSIONS`][fastapi_proxy_lib.core.websocket.SUPPORTED_WS_HTTP_VERSIONS].

    Args:
        websocket: The client websocket requests.
        target_url: The url of target websocket server.

    Returns:
        If the establish websocket connection unsuccessfully:
            - Will call `websocket.close()` to send code `4xx`
            - Then return a `StarletteResponse` from target server
        If the establish websocket connection successfully:
            - Will run forever until the connection is closed. Then return False.
    """
    client = self.client
    follow_redirects = self.follow_redirects
    max_message_size_bytes = self.max_message_size_bytes
    queue_size = self.queue_size
    keepalive_ping_interval_seconds = self.keepalive_ping_interval_seconds
    keepalive_ping_timeout_seconds = self.keepalive_ping_timeout_seconds

    client_request_subprotocols: Union[List[str], None] = (
        _get_client_request_subprotocols(websocket.scope)
    )

    # httpx.stream()
    # refer to: https://www.python-httpx.org/api/#helper-functions
    client_request_headers: "HeaderTypes" = _change_client_header(
        headers=websocket.headers, target_url=target_url
    )
    client_request_params: "QueryParamTypes" = websocket.query_params

    # TODO: 是否可以不检查http版本?
    check_result = check_http_version(websocket.scope, SUPPORTED_WS_HTTP_VERSIONS)
    if check_result is not None:
        # NOTE: return 之前最好关闭websocket
        await websocket.close()
        return check_result

    # DEBUG: 用于调试的记录
    logging.debug(
        "WS: client:%s ; url:%s ; params:%s ; headers:%s",
        websocket.client,
        target_url,
        client_request_params,
        client_request_headers,
    )

    # https://github.com/frankie567/httpx-ws/discussions/11
    # https://docs.python.org/3.12/library/contextlib.html?highlight=asyncexitstack#catching-exceptions-from-enter-methods
    stack = AsyncExitStack()
    try:
        # FIX: https://github.com/WSH032/fastapi-proxy-lib/security/advisories/GHSA-7vwr-g6pm-9hc8
        # time cost: 396 ns ± 3.39 ns
        # 由于这不是原子性的操作,所以不保证一定阻止cookie泄漏
        # 一定能保证修复的方法是通过`_tool.change_necessary_client_header_for_httpx`强制指定优先级最高的cookie头
        client.cookies.clear()

        proxy_ws = await stack.enter_async_context(
            httpx_ws.aconnect_ws(
                # 这个是httpx_ws类型注解的问题,其实是可以使用httpx.URL的
                url=target_url,  # pyright: ignore [reportArgumentType]
                client=client,
                max_message_size_bytes=max_message_size_bytes,
                queue_size=queue_size,
                keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
                keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
                subprotocols=client_request_subprotocols,
                # httpx.stream() params
                # refer to: https://www.python-httpx.org/api/#helper-functions
                headers=client_request_headers,
                params=client_request_params,
                follow_redirects=follow_redirects,
            )
        )
    except httpx_ws.WebSocketUpgradeError as e:
        # 这个错误是在 httpx.stream 获取到响应后才返回的, 也就是说至少本服务器的网络应该是正常的
        # 且对于反向ws代理来说,本服务器管理者有义务保证与目标服务器的连接是正常的
        # 所以这里既有可能是客户端的错误,或者是目标服务器拒绝了连接
        # TODO: 也有可能是本服务器的未知错误
        proxy_res = e.response

        # NOTE: return 之前最好关闭websocket
        # 不调用websocket.accept就发送关闭请求,uvicorn会自动发送403错误
        await websocket.close()
        # TODO: 连接失败的时候httpx_ws会自己关闭连接,但或许这里显式关闭会更好

        # HACK: 这里的返回的响应其实uvicorn不会处理
        return StreamingResponse(
            content=proxy_res.aiter_raw(),
            status_code=proxy_res.status_code,
            headers=proxy_res.headers,
        )

    # NOTE: 对于反向代理服务器,我们不返回 "任何" "具体的内部" 错误信息给客户端,因为这可能涉及到服务器内部的信息泄露

    # NOTE: 请使用 with 语句来 "保证关闭" AsyncWebSocketSession
    async with stack:
        # TODO: websocket.accept 中还有一个headers参数,但是httpx_ws不支持,考虑发起PR
        # https://github.com/frankie567/httpx-ws/discussions/53

        # FIXME: 调查缺少headers参数是否会引起问题,及是否会影响透明代理的无损转发性
        # https://asgi.readthedocs.io/en/latest/specs/www.html#accept-send-event

        # 这时候如果发生错误,退出时 stack 会自动关闭 httpx_ws 连接,所以这里不需要手动关闭
        await websocket.accept(
            subprotocol=proxy_ws.subprotocol
            # headers=...
        )

        client_to_server_task = asyncio.create_task(
            _wait_client_then_send_to_server(
                client_ws=websocket,
                server_ws=proxy_ws,
            ),
            name="client_to_server_task",
        )
        server_to_client_task = asyncio.create_task(
            _wait_server_then_send_to_client(
                client_ws=websocket,
                server_ws=proxy_ws,
            ),
            name="server_to_client_task",
        )
        # 保持强引用: https://docs.python.org/zh-cn/3.12/library/asyncio-task.html#creating-tasks
        task_group = _ClientServerProxyTask(
            client_to_server_task=client_to_server_task,
            server_to_client_task=server_to_client_task,
        )

        # NOTE: 考虑这两种情况:
        # 1. 如果一个任务在发送阶段退出:
        #   这意味着对应发送的ws已经关闭或者出错
        #   那么另一个任务很快就会在接收该ws的时候引发异常而退出
        #   很快,最终两个任务都结束
        #   **这时候pending 可能 为空,而done为两个任务**
        # 2. 如果一个任务在接收阶段退出:
        #   这意味着对应接收的ws已经关闭或者发生出错
        #   - 对于另一个任务的发送,可能会在发送的时候引发异常而退出
        #       - 可能指的是: wsproto后端的uvicorn发送消息永远不会出错
        #       - https://github.com/encode/uvicorn/discussions/2137
        #   - 对于另一个任务的接收,可能会等待很久,才能继续进行发送任务而引发异常而退出
        #   **这时候pending一般为一个未结束任务**
        #
        #   因为第二种情况的存在,所以需要用 wait_for 强制让其退出
        #   但考虑到第一种情况,先等它 1s ,看看能否正常退出
        try:
            _, pending = await asyncio.wait(
                task_group,
                return_when=asyncio.FIRST_COMPLETED,
            )
            for (
                pending_task
            ) in pending:  # NOTE: pending 一般为一个未结束任务,或者为空
                # 开始取消未结束的任务
                try:
                    await asyncio.wait_for(pending_task, timeout=1)
                except asyncio.TimeoutError:
                    logging.debug(f"{pending} TimeoutError, it's normal.")
                except Exception as e:
                    # 取消期间可能另一个ws会发生异常,这个是正常情况,且会被 asyncio.wait_for 传播
                    logging.debug(
                        f"{pending} raise error when being canceled, it's normal. error: {e}"
                    )
        except Exception as e:  # pragma: no cover # 这个是保险分支,通常无法执行
            logging.warning(
                f"Something wrong, please contact the developer. error: {e}"
            )
            raise
        finally:
            # 无论如何都要关闭两个websocket
            # NOTE: 这时候两个任务都已经结束
            await _close_ws(
                client_to_server_task=client_to_server_task,
                server_to_client_task=server_to_client_task,
                client_ws=websocket,
                server_ws=proxy_ws,
            )
    return False

ReverseWebSocketProxy

Bases: BaseWebSocketProxy

Reverse http proxy.

Attributes:

Name Type Description
client AsyncClient

The httpx.AsyncClient to establish websocket connection.

base_url URL

The target proxy server url.

follow_redirects bool

Whether follow redirects of target server.

max_message_size_bytes int
queue_size int
keepalive_ping_interval_seconds Union[float, None]
keepalive_ping_timeout_seconds Union[float, None]
Tip

httpx_ws.aconnect_ws

There is a issue for handshake response:

This WebSocket proxy can correctly forward request headers. But currently, it is unable to properly forward responses from the target service, including successful handshake response headers and the entire response in case of a handshake failure.

In most cases, you don't need to worry about this. It only affects the HTTP handshake before establishing the WebSocket, and regular WebSocket messages will be forwarded correctly.

# Examples

from contextlib import asynccontextmanager
from typing import AsyncIterator

from fastapi import FastAPI
from fastapi_proxy_lib.core.websocket import ReverseWebSocketProxy
from httpx import AsyncClient
from starlette.websockets import WebSocket

proxy = ReverseWebSocketProxy(AsyncClient(), base_url="ws://echo.websocket.events/")

@asynccontextmanager
async def close_proxy_event(_: FastAPI) -> AsyncIterator[None]:
    """Close proxy."""
    yield
    await proxy.aclose()

app = FastAPI(lifespan=close_proxy_event)

@app.websocket("/{path:path}")
async def _(websocket: WebSocket, path: str = ""):
    return await proxy.proxy(websocket=websocket, path=path)

# Then run shell: `uvicorn <your.py>:app --host http://127.0.0.1:8000 --port 8000`
# visit the app: `ws://127.0.0.1:8000/`
# you can establish websocket connection with `ws://echo.websocket.events`
Source code in src/fastapi_proxy_lib/core/websocket.py
class ReverseWebSocketProxy(BaseWebSocketProxy):
    '''Reverse http proxy.

    Attributes:
        client: The `httpx.AsyncClient` to establish websocket connection.
        base_url: The target proxy server url.
        follow_redirects: Whether follow redirects of target server.
        max_message_size_bytes: refer to [httpx_ws.aconnect_ws][]
        queue_size: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_interval_seconds: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_timeout_seconds: refer to [httpx_ws.aconnect_ws][]

    Tip:
        [`httpx_ws.aconnect_ws`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.aconnect_ws)

    Bug: There is a issue for handshake response:
        This WebSocket proxy can correctly forward request headers.
        But currently,
        it is unable to properly forward responses from the target service,
        including successful handshake response headers and
        the entire response in case of a handshake failure.

        **In most cases, you don't need to worry about this.**
        It only affects the HTTP handshake before establishing the WebSocket,
        and regular WebSocket messages will be forwarded correctly.

    # # Examples

    ```python
    from contextlib import asynccontextmanager
    from typing import AsyncIterator

    from fastapi import FastAPI
    from fastapi_proxy_lib.core.websocket import ReverseWebSocketProxy
    from httpx import AsyncClient
    from starlette.websockets import WebSocket

    proxy = ReverseWebSocketProxy(AsyncClient(), base_url="ws://echo.websocket.events/")

    @asynccontextmanager
    async def close_proxy_event(_: FastAPI) -> AsyncIterator[None]:
        """Close proxy."""
        yield
        await proxy.aclose()

    app = FastAPI(lifespan=close_proxy_event)

    @app.websocket("/{path:path}")
    async def _(websocket: WebSocket, path: str = ""):
        return await proxy.proxy(websocket=websocket, path=path)

    # Then run shell: `uvicorn <your.py>:app --host http://127.0.0.1:8000 --port 8000`
    # visit the app: `ws://127.0.0.1:8000/`
    # you can establish websocket connection with `ws://echo.websocket.events`
    ```
    '''

    client: httpx.AsyncClient
    base_url: httpx.URL
    follow_redirects: bool
    max_message_size_bytes: int
    queue_size: int
    keepalive_ping_interval_seconds: Union[float, None]
    keepalive_ping_timeout_seconds: Union[float, None]

    @override
    def __init__(
        self,
        client: Optional[httpx.AsyncClient] = None,
        *,
        base_url: Union[httpx.URL, str],
        follow_redirects: bool = False,
        max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
        queue_size: int = DEFAULT_QUEUE_SIZE,
        keepalive_ping_interval_seconds: Union[
            float, None
        ] = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
        keepalive_ping_timeout_seconds: Union[
            float, None
        ] = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
    ) -> None:
        """Reverse http proxy.

        Note: please make sure `base_url` is available.
            Because when an error occurs,
            we cannot distinguish whether it is a proxy server network error, or it is a error of `base_url`.

        Args:
            base_url: The target proxy server url.
            client: The `httpx.AsyncClient` to establish websocket connection. Defaults to None.<br>
                If None, will create a new `httpx.AsyncClient`,
                else will use the given `httpx.AsyncClient`.
            follow_redirects: Whether follow redirects of target server. Defaults to False.

            max_message_size_bytes: refer to [httpx_ws.aconnect_ws][]
            queue_size: refer to [httpx_ws.aconnect_ws][]
            keepalive_ping_interval_seconds: refer to [httpx_ws.aconnect_ws][]
            keepalive_ping_timeout_seconds: refer to [httpx_ws.aconnect_ws][]

        Tip:
            [`httpx_ws.aconnect_ws`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.aconnect_ws)
        """
        self.base_url = check_base_url(base_url)
        super().__init__(
            client,
            follow_redirects=follow_redirects,
            max_message_size_bytes=max_message_size_bytes,
            queue_size=queue_size,
            keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
            keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
        )

    @override
    async def proxy(  # pyright: ignore [reportIncompatibleMethodOverride]
        self, *, websocket: starlette_ws.WebSocket, path: Optional[str] = None
    ) -> Union[Literal[False], StarletteResponse]:
        """Establish websocket connection for both client and target_url, then pass messages between them.

        Args:
            websocket: The client websocket requests.
            path: The path params of websocket request, which means the path params of base url.<br>
                If None, will get it from `websocket.path_params`.<br>
                **Usually, you don't need to pass this argument**.

        Returns:
            If the establish websocket connection unsuccessfully:
                - Will call `websocket.close()` to send code `4xx`
                - Then return a `StarletteResponse` from target server
            If the establish websocket connection successfully:
                - Will run forever until the connection is closed. Then return False.
        """
        base_url = self.base_url

        # 只取第一个路径参数。注意,我们允许没有路径参数,这代表直接请求
        path_param: str = (
            path if path is not None else next(iter(websocket.path_params.values()), "")
        )

        # 将路径参数拼接到目标url上
        # e.g: "https://www.example.com/p0/" + "p1"
        # NOTE: 这里的 path_param 是不带查询参数的,且允许以 "/" 开头 (最终为/p0//p1)
        target_url = base_url.copy_with(
            path=(base_url.path + path_param)
        )  # 耗时: 18.4 µs ± 262 ns

        # self.send_request_to_target 内部会处理连接失败时,返回错误给客户端,所以这里不处理了
        return await self.send_request_to_target(
            websocket=websocket, target_url=target_url
        )

__init__(client=None, *, base_url, follow_redirects=False, max_message_size_bytes=DEFAULT_MAX_MESSAGE_SIZE_BYTES, queue_size=DEFAULT_QUEUE_SIZE, keepalive_ping_interval_seconds=DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS, keepalive_ping_timeout_seconds=DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS)

Reverse http proxy.

please make sure base_url is available.

Because when an error occurs, we cannot distinguish whether it is a proxy server network error, or it is a error of base_url.

Parameters:

Name Type Description Default
base_url Union[URL, str]

The target proxy server url.

required
client Optional[AsyncClient]

The httpx.AsyncClient to establish websocket connection. Defaults to None.
If None, will create a new httpx.AsyncClient, else will use the given httpx.AsyncClient.

None
follow_redirects bool

Whether follow redirects of target server. Defaults to False.

False
max_message_size_bytes int DEFAULT_MAX_MESSAGE_SIZE_BYTES
queue_size int DEFAULT_QUEUE_SIZE
keepalive_ping_interval_seconds Union[float, None] DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS
keepalive_ping_timeout_seconds Union[float, None] DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS
Tip

httpx_ws.aconnect_ws

Source code in src/fastapi_proxy_lib/core/websocket.py
@override
def __init__(
    self,
    client: Optional[httpx.AsyncClient] = None,
    *,
    base_url: Union[httpx.URL, str],
    follow_redirects: bool = False,
    max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
    queue_size: int = DEFAULT_QUEUE_SIZE,
    keepalive_ping_interval_seconds: Union[
        float, None
    ] = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
    keepalive_ping_timeout_seconds: Union[
        float, None
    ] = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
) -> None:
    """Reverse http proxy.

    Note: please make sure `base_url` is available.
        Because when an error occurs,
        we cannot distinguish whether it is a proxy server network error, or it is a error of `base_url`.

    Args:
        base_url: The target proxy server url.
        client: The `httpx.AsyncClient` to establish websocket connection. Defaults to None.<br>
            If None, will create a new `httpx.AsyncClient`,
            else will use the given `httpx.AsyncClient`.
        follow_redirects: Whether follow redirects of target server. Defaults to False.

        max_message_size_bytes: refer to [httpx_ws.aconnect_ws][]
        queue_size: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_interval_seconds: refer to [httpx_ws.aconnect_ws][]
        keepalive_ping_timeout_seconds: refer to [httpx_ws.aconnect_ws][]

    Tip:
        [`httpx_ws.aconnect_ws`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.aconnect_ws)
    """
    self.base_url = check_base_url(base_url)
    super().__init__(
        client,
        follow_redirects=follow_redirects,
        max_message_size_bytes=max_message_size_bytes,
        queue_size=queue_size,
        keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
        keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
    )

proxy(*, websocket, path=None) async

Establish websocket connection for both client and target_url, then pass messages between them.

Parameters:

Name Type Description Default
websocket WebSocket

The client websocket requests.

required
path Optional[str]

The path params of websocket request, which means the path params of base url.
If None, will get it from websocket.path_params.
Usually, you don't need to pass this argument.

None

Returns:

Type Description
Union[Literal[False], Response]

If the establish websocket connection unsuccessfully: - Will call websocket.close() to send code 4xx - Then return a StarletteResponse from target server

Union[Literal[False], Response]

If the establish websocket connection successfully: - Will run forever until the connection is closed. Then return False.

Source code in src/fastapi_proxy_lib/core/websocket.py
@override
async def proxy(  # pyright: ignore [reportIncompatibleMethodOverride]
    self, *, websocket: starlette_ws.WebSocket, path: Optional[str] = None
) -> Union[Literal[False], StarletteResponse]:
    """Establish websocket connection for both client and target_url, then pass messages between them.

    Args:
        websocket: The client websocket requests.
        path: The path params of websocket request, which means the path params of base url.<br>
            If None, will get it from `websocket.path_params`.<br>
            **Usually, you don't need to pass this argument**.

    Returns:
        If the establish websocket connection unsuccessfully:
            - Will call `websocket.close()` to send code `4xx`
            - Then return a `StarletteResponse` from target server
        If the establish websocket connection successfully:
            - Will run forever until the connection is closed. Then return False.
    """
    base_url = self.base_url

    # 只取第一个路径参数。注意,我们允许没有路径参数,这代表直接请求
    path_param: str = (
        path if path is not None else next(iter(websocket.path_params.values()), "")
    )

    # 将路径参数拼接到目标url上
    # e.g: "https://www.example.com/p0/" + "p1"
    # NOTE: 这里的 path_param 是不带查询参数的,且允许以 "/" 开头 (最终为/p0//p1)
    target_url = base_url.copy_with(
        path=(base_url.path + path_param)
    )  # 耗时: 18.4 µs ± 262 ns

    # self.send_request_to_target 内部会处理连接失败时,返回错误给客户端,所以这里不处理了
    return await self.send_request_to_target(
        websocket=websocket, target_url=target_url
    )