Django3 uses WebSocket to implement WebShell


Recently, we need to develop the front-end function of operating remote virtual machines, referred to as WebShell. Based on the current technology stack of react+django, we found that most of the back-end implementations are django+channels to achieve websocket services.

After a general look at this is not interesting enough, looked through the official documentation of django and found that django does not natively support websocket, but after django3 supports the asgi protocol to achieve their own websocket services. So selected

gunicorn+uvicorn+asgi+websocket+django3.2+paramiko to implement WebShell.

Implementing websocket service

The project generated by django's own scaffolding will automatically generate two files and, most common applications use with nginx to deploy online services. This time we mainly use

The idea of implementing websocket service can be found roughly by searching the Internet, mainly to achieve connect/send/receive/disconnect the handling of several actions.

Here How to Add Websockets to a Django App without Extra Dependencies is a good example

, but too simple ........ :


 import os

 from django.core.asgi import get_asgi_application
 from websocket_app.websocket import websocket_application

os.environ.setdefault ('DJANGO_SETTINGS_MODULE', 'websocket_app.settings') 

django_application = get_asgi_application()

 async def application(scope, receive, send):
   if scope ['type'] == 'http': 
       await django_application(scope, receive, send)
   elif scope ['type'] == 'websocket': 
       await websocket_application(scope, receive, send)
       raise NotImplementedError(f "Unknown scope type {scope['type']}" )

 async def websocket_application(scope, receive, send):
 async def   websocket_application  (scope, receive, send):
   while True:
        event = await receive()

       if event ['type'] == 'websocket.connect': 
           await send({
               'type': 'websocket.accept' 

       if event ['type' ] == 'websocket.disconnect': 

       if event['type ' ] == 'websocket. receive': 
           if event ['text' ] == 'ping': 
               await send({
                   'type': 'websocket.send', 
                   'text': 'pong! 


The above code provides the idea, for a more complete implementation you can refer here websockets-in-django-3-1 which is basically reusable

I'll put the core of the implementation below:

 class WebSocket:
   def __init__(self, scope, receive, send):
        self._scope = scope
        self._receive = receive
        self._send = send
        self._client_state = State.CONNECTING
        self._app_state = State.CONNECTING

   def headers(self):
       return Headers(self._scope)

   def scheme( self):
       return self._scope ["scheme" ]

   def path( self):
       return self._scope[" path " ]

   def query_params( self):
       return QueryParams(self._scope ["query_string" ].decode())

   def query_string(self) -> str:
       return self._scope [ "query_string" ]

   def scope(self):
       return self._scope

   async def accept(self, subprotocol: str = None):
       """Accept connection.
        :param subprotocol: The subprotocol the server wishes to accept.
        :type subprotocol: str, optional
       if self._client_state == State.CONNECTING:
           await self.receive()
       await self.send({ "type": SendEvent.ACCEPT, "subprotocol": subprotocol})

   async def close(self, code: int = 1000 ):
       await self.send({ " type" : SendEvent.CLOSE, "code " : code})

   async def send(self, message: t.Mapping):
       if self._app_state == State.DISCONNECTED:
           raise RuntimeError ("WebSocket is disconnected." )

       if self._app_state == State.CONNECTING:
           assert message ["type" ] in {SendEvent.ACCEPT, SendEvent.CLOSE}, (
                   'Could not write event "%s" into socket in connecting state.'
                    % message [ "type" ]
           if message [ "type" ] == SendEvent.CLOSE:
                self._app_state = State.DISCONNECTED
                self._app_state = State.CONNECTED

       elif self._app_state == State.CONNECTED:
           assert message ["type" ] in {SendEvent.SEND, SendEvent.CLOSE}, (
                   'Connected socket can send "%s" and "%s" events, not "%s"'
                    % (SendEvent.SEND, SendEvent.CLOSE, message ["type" ])
           if message [" type" ] == SendEvent.CLOSE:
                self._app_state = State.DISCONNECTED

       await self._send(message)

   async def receive(self):
       if self._client_state == State.DISCONNECTED:
           raise RuntimeError ("WebSocket is disconnected." )

        message = await self._receive()

       if self._client_state == State.CONNECTING:
           assert message ["type" ] == ReceiveEvent.CONNECT, (
                   'WebSocket is in connecting state but received "%s" event'
                    % message [ "type" ]
            self._client_state = State.CONNECTED

       elif self._client_state == State.CONNECTED:
           assert message [" type" ] in {ReceiveEvent.RECEIVE, ReceiveEvent.DISCONNECT}, (
                   'WebSocket is connected but received invalid event "%s". '
                    % message ["type" ]
           if message [" type" ] == ReceiveEvent.DISCONNECT:
                self._client_state = State.DISCONNECTED

       return message

The stitching monster

As a qualified code mover, in order to improve the efficiency of handling or to build some wheels to fill some holes, how to combine the above WebSocket class with paramiko to achieve from the front-end to accept characters passed to the remote host and accept the return at the same time?

 import asyncio
 import traceback
 import paramiko
 from webshell.ssh import Base, RemoteSSH
 from webshell.connection import WebSocket

 class WebShell:
   """Organize WebSocket and paramiko.Channel to interoperate data between them.""" 

   def __init__(self, ws_session: WebSocket,
                 ssh_session: paramiko.SSHClient = None,
                 chanel_session: paramiko.Channel = None
        self.ws_session = ws_session
        self.ssh_session = ssh_session
        self.chanel_session = chanel_session

   def init_ssh(self, host=None, port=22, user= "admin", passwd= "admin@123" ):
        self.ssh_session, self.chanel_session = RemoteSSH(host, port, user, passwd).session()

   def set_ssh(self, ssh_session, chanel_session):
        self.ssh_session = ssh_session
        self.chanel_session = chanel_session

   async def ready(self):
       await self.ws_session.accept()

   async def welcome(self ):
       # Show Linux welcome related content 
       for i in range (2 ):
           if self.chanel_session.send_ready():
                message = self.chanel_session.recv (2048 ).decode ('utf-8') 
               if not message:
               await self.ws_session.send_text(message)

   async def web_to_ssh(self):
       # print('--------web_to_ssh------->')
       while True:
           # print('--------------->')
           if not or not self.ws_session.status:
           await asyncio.sleep (0.01 )
            shell = await self.ws_session.receive_text()
           # print('-------shell-------->', shell)
           if and self.chanel_session.send_ready():
                self.chanel_session.send(bytes(shell, 'utf-8') )
           # print('--------------->', "end")

   async def ssh_to_web(self):
       # print('<--------ssh_to_web-----------')
       while True:
           # print('<-------------------')
           if not
               await self.ws_session.send_text ('ssh closed') 
           if not self.ws_session.status:
           await asyncio.sleep (0.01 )
           if self.chanel_session.recv_ready():
                message = self.chanel_session.recv (2048 ).decode ('utf-8') 
               # print('<---------message----------', message)
               if not len(message):
               await self.ws_session.send_text(message)
           # print('<-------------------', "end")

   async def run(self):
       if not self.ssh_session:
           raise Exception ("ssh not init!" )
       await self.ready()
       await asyncio.gather(

   def clear(self):
       except Exception:
       except Exception:

The front end

xterm.js is perfectly fine, just search for something that looks simple.

export class Term extends React. component {
    private terminal!: HTMLDivElement;
    private fitAddon = new FitAddon();

    componentDidMount() {
        const xterm = new Terminal();
        xterm.loadAddon (this.fitAddon);
        xterm.loadAddon ( new WebLinksAddon());

       // using wss for https
       // const socket = new WebSocket ("ws://" + + "/api/v1/ws" );
        const socket = new WebSocket("ws ://localhost:8000/webshell/" );
       // socket.onclose = (event) => {
       // this.props.onClose();
       // }
        socket.onopen = ( event) => {
            xterm.loadAddon (new AttachAddon(socket));
        } (this.terminal);
        xterm.onResize (({ cols, rows }) => {
            socket.send ("<RESIZE>" + cols + "," + rows)

       window.addEventListener ('resize', this.onResize);

    componentWillUnmount() {
       window.removeEventListener('resize ', this.onResize);
    onResize = () => {;

    render() {
       return <div className= "Terminal" ref={(ref) => this.terminal = ref as HTMLDivElement}></div>;

Ok, enough of the nonsense, I'll put the code here webshell welcome star/fork!