GIF89; GIF89; %PDF- %PDF-
__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# -*- test-case-name: twisted.test.test_strports -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Construct listening port services from a simple string description.
@see: L{twisted.internet.endpoints.serverFromString}
@see: L{twisted.internet.endpoints.clientFromString}
"""
from typing import Optional, cast
from twisted.application.internet import StreamServerEndpointService
from twisted.internet import endpoints, interfaces
def _getReactor() -> interfaces.IReactorCore:
from twisted.internet import reactor
return cast(interfaces.IReactorCore, reactor)
def service(
description: str,
factory: interfaces.IProtocolFactory,
reactor: Optional[interfaces.IReactorCore] = None,
) -> StreamServerEndpointService:
"""
Return the service corresponding to a description.
@param description: The description of the listening port, in the syntax
described by L{twisted.internet.endpoints.serverFromString}.
@type description: C{str}
@param factory: The protocol factory which will build protocols for
connections to this service.
@type factory: L{twisted.internet.interfaces.IProtocolFactory}
@rtype: C{twisted.application.service.IService}
@return: the service corresponding to a description of a reliable stream
server.
@see: L{twisted.internet.endpoints.serverFromString}
"""
if reactor is None:
reactor = _getReactor()
svc = StreamServerEndpointService(
endpoints.serverFromString(reactor, description), factory
)
svc._raiseSynchronously = True
return svc
def listen(
description: str, factory: interfaces.IProtocolFactory
) -> interfaces.IListeningPort:
"""
Listen on a port corresponding to a description.
@param description: The description of the connecting port, in the syntax
described by L{twisted.internet.endpoints.serverFromString}.
@type description: L{str}
@param factory: The protocol factory which will build protocols on
connection.
@type factory: L{twisted.internet.interfaces.IProtocolFactory}
@rtype: L{twisted.internet.interfaces.IListeningPort}
@return: the port corresponding to a description of a reliable virtual
circuit server.
@see: L{twisted.internet.endpoints.serverFromString}
"""
from twisted.internet import reactor
name, args, kw = endpoints._parseServer(description, factory)
return cast(
interfaces.IListeningPort, getattr(reactor, "listen" + name)(*args, **kw)
)
__all__ = ["service", "listen"]
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| runner | Folder | 0755 |
|
|
| test | Folder | 0755 |
|
|
| twist | Folder | 0755 |
|
|
| __init__.py | File | 129 B | 0644 |
|
| app.py | File | 22.61 KB | 0644 |
|
| internet.py | File | 36.29 KB | 0644 |
|
| reactors.py | File | 2.28 KB | 0644 |
|
| service.py | File | 11.47 KB | 0644 |
|
| strports.py | File | 2.53 KB | 0644 |
|