Examples

1. Hello World

Download example code from [here]

The same “hello world” example we already discussed here.

import asyncio, logging, traceback
from task_thread import TaskThread, reCreate, reSchedule,\
    delete, verbose, signals

Define TaskThread by subclassing

class MyThread(TaskThread):

Let’s define a rescheduling task that sleeps for a one seconds & then reschedules itself

async def helloTask__(self):
    try:
        await asyncio.sleep(1)
        self.logger.info("Hello from helloTask__ at %s", self.getId())
        self.tasks.hello_task = await reSchedule(self.helloTask__)
        return

    except asyncio.CancelledError:
        self.logger.critical("helloTask__: cancelling")

    except Exception as e:
        self.logger.info("helloTask__: failed with '%s', traceback will follow", e)
        traceback.print_exc()

Remember to call the superclass constructor

def __init__(self, my_id = 0, parent = None):
    super().__init__(parent = parent)
    self.my_id = my_id

Create the tasks under the self.tasks namespace & initialize them to None

def initVars__(self):
    # tasks
    self.tasks.hello = None  # self.helloTask__

The helloTask__ is started for the first time when the thread is started:

@verbose
async def enter__(self):
    self.logger.info("enter__ :")
    self.tasks.hello = await reCreate(self.tasks.hello, self.helloTask__)

Define what happens at thread exit: delete any running tasks

@verbose
async def exit__(self):
    """Close sockets, databases, etc.  Overwrite in child class.
    """
    self.tasks.hello = await delete(self.tasks.hello)
    self.logger.info("exit__ : finished")

Here we would define how signals from a parent are handled. In this example case nothing.

@verbose
async def signalHandler__(self, signal):
    self.logger.info("signalHandler__ : got signal %s", signal)

def getId(self):
    return self.my_id

def getInfo(self):
    return "<MyThread "+str(self.my_id)+">"

Let’s run MyThread! Terminate by pressing CTRL-C.

if __name__ == "__main__":
    loglev = logging.DEBUG

    logger = logging.getLogger("MyThread")
    logger.setLevel(loglev)

    thread = MyThread(my_id = "main_thread", parent = None) # no parent, this is the main thread
    loop = asyncio.get_event_loop()
    loop.run_until_complete(thread.run())

2. Parent/Child Structure

Download example code from [here]

import asyncio, logging, traceback
from task_thread import TaskThread, reCreate, reSchedule,\
    delete, verbose, signals

Let’s define a signal for parent/child communication

class MessageSignal(signals.Signal):
    def __init__(self, origin, message):
        self.origin = origin
        self.message = message

    def __str__(self):
        return "<MessageSignal from %s>" % (str(self.origin.getId()))

    def getMessage(self):
        return self.message

We use the same MyThread as in the previous example. We will use it as a basis for further subclassing

class MyThread(TaskThread):
    # as in previous example
    # ...

Let’s create a child thread. This thread sends a message to it’s parent every 5 seconds:

class ChildThread(MyThread):

Define a rescheduling task that sends a message to it’s parent every 5 seconds

async def messageTask__(self):
    try:
        await asyncio.sleep(5)
        self.logger.info("messageTask__: sending a message to parent")

        await self.sigFromChildToParent__(
            MessageSignal(origin = self,
            message = "hello from child " + str(self.getId())))
        self.tasks.message_task = await reSchedule(self.messageTask__); return

    except asyncio.CancelledError:
        self.logger.critical("messageTask__: cancelling")

    except Exception as e:
        self.logger.info("messageTask__: failed with '%s', traceback will follow", e)
        traceback.print_exc()

Add the new task to self.tasks namespace

def initVars__(self):
    # tasks
    self.tasks.hello   = None  # self.helloTask__
    self.tasks.message = None  # self.messageTask__
    # locks
    self.locks.message_lock = asyncio.Lock()

Start both tasks at thread start

@verbose
async def enter__(self):
    self.logger.info("enter__")
    self.tasks.hello = await reCreate(self.tasks.hello, self.helloTask__)
    self.tasks.message = await reCreate(self.tasks.message, self.messageTask__)

Kill both tasks at thread exit

@verbose
async def exit__(self):
    self.tasks.hello = await delete(self.tasks.hello)
    self.tasks.message = await delete(self.tasks.message)
    self.logger.info("exit__ : finished")

Now the parent thread that receives signals from the child:

class ParentThread(MyThread):

Start self.helloTask__ just like in the child thread.

The new bit here is, that we create and start a child thread at parent thread start.

After starting, the child thread is added to the parent’s registry using addChild.

You can experiment by adding more and more child threads.

@verbose
async def enter__(self):
    self.logger.info("enter__")
    self.tasks.hello = await reCreate(self.tasks.hello, self.helloTask__)
    child_thread = ChildThread(my_id = "subthread", parent = self)
    await child_thread.run()
    await self.addChild(child_thread)

Define how we handle those signals coming from the running child thread:

@verbose
async def childsignalHandler__(self, signal, child):
    self.logger.debug("childsignalHandler__ : got signal %s from child %s", signal, child.getId())
    if isinstance(signal, MessageSignal):
        self.logger.info("Got message %s from child with id %s",
        signal.getMessage(), child.getId())
    else:
        pass

Please note that you can also start child threads “dynamically”, i.e. anywhere in your classes internal methods/recheduling tasks, not just at enter__.

That’s it! Nothing else is needed: when the parent thread terminates, it automagically terminates all of it’s child threads.

Run the program with:

if __name__ == "__main__":
    loglev = logging.DEBUG

    logger = logging.getLogger("ParentThread")
    logger.setLevel(loglev)

    logger = logging.getLogger("ChildThread")
    logger.setLevel(loglev)

    thread = ParentThread(my_id = "parent_thread", parent = None) # no parent, this is the main thread
    loop = asyncio.get_event_loop()
    loop.run_until_complete(thread.run())

Terminate by pressing CTRL-C.

3. TCP Server

  • Download server example code from [here]

  • Download client side code from [here]

import asyncio, logging, traceback
from task_thread import TaskThread, reCreate, reSchedule,\
    delete, MessageSignal, verbose, signals

Let’s create a TCP server application. The hierarchy looks like this:

MasterTCPServerThread
    TCPConnectionThread
    TCPConnectionThread
    ...

As always, start by subclassing TaskThread. First we create the child TCPConnectionThread.

class TCPConnectionThread(TaskThread):

Constructor takes as an argument stream reader and writer objects:

def __init__(self, parent = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamReader = None):
    super().__init__(parent = parent)
    assert(reader is not None)
    assert(writer is not None)
    self.reader = reader
    self.writer = writer
    self.peername, self.peerport = self.writer.get_extra_info("peername")

def getId(self):
    return str(id(self))

def getInfo(self):
    return "<TCPConnectionThread %s connected to %s>" % (self.getId(), self.peername)

Re-scheduling tasks for reading and writing the client socket:

def initVars__(self):
    self.tasks.read_socket = None
    self.tasks.write_socket = None

In this version we just read the socket, so start the corresponding re-scheduling task:

@verbose
async def enter__(self):
    self.logger.info("enter__ : %s", self.getInfo())
    self.tasks.read_socket = await reCreate(self.tasks.read_socket, self.readSocket__)

@verbose
async def exit__(self):
    self.tasks.read_socket = await delete(self.tasks.read_socket)
    self.tasks.write_socket = await delete(self.tasks.write_socket)
    self.logger.debug("exit__: bye!")

Reads packet from the socket & re-schedules itself.

At failure, kill the whole TCPConnectionThread. This will inform the parent automagically & remove this child thread from the parent’s registry.

async def readSocket__(self):
    try:
        try:
            packet = await self.reader.read()
        except Exception as e:
            self.logger.warning("readSocket__ : reading failed : '%s'", e)
            self.logger.info("readSocket__: tcp connection %s terminated", self.getInfo())
            await self.stop()
        else:
            if len(packet) > 0:
                # all good!  keep on reading = reschedule this
                self.logger.info("readSocket__: got packet %s", packet)
                self.tasks.read_socket = await reSchedule(self.readSocket__); return
            else:
                self.logger.info("readSocket__: client at %s closed connection", self.peername)
                await self.stop()

    except asyncio.CancelledError:
        self.logger.info("readSocket__ : cancelling %s", self.getInfo())

    except Exception as e:
        self.logger.warning("readSocket__: failed with '%s'", e)
        await self.stop()

Writing socket continuously (however, not used in this example):

async def writeSocket__(self, packet):
    try:
        self.writer.write(packet)
        # await self.writer.drain() # this would flush
    except Exception as e:
        self.logger.warning("writeSocket__ : writing failed : '%s'", e)
        await self.stop()

Next, create the parent MasterTCPServerThread:

class MasterTCPServerThread(TaskThread):

    def __init__(self, parent = None, name = "thread", pause = 10, port = 5002, max_clients = 10):
        super().__init__(parent = parent)
        self.pause = pause
        self.port = port
        self.max_clients = max_clients
        self.name = name


    def initVars__(self):
        self.server = None
        self.tasks.tcp_server = None


    def getId(self):
        return str(id(self))


    def getInfo(self):
        return "<MasterTCPServerThread %s>" % (self.name)


    @verbose
    async def enter__(self):
        self.logger.info("entry point")
        self.tasks.tcp_server = await reCreate(self.tasks.tcp_server, self.tcpServer__)


    @verbose
    async def exit__(self):
        self.tasks.tcp_server = await delete(self.tasks.tcp_server)
        self.logger.debug("exit__: bye!")

Each and every child / tcp client sends it’s packets to the parent:

@verbose
async def childsignalHandler__(self, signal, child):
    self.logger.debug("childsignalHandler__ : got signal %s from child %s", signal, child)
    if isinstance(signal, MessageSignal):
        self.logger.info("Got message %s from child with id %s", signal.getMessage(), child.getId())
    else:
        pass

Keep on (re)creating the tcp server:

async def tcpServer__(self):
    try:
        self.server = await asyncio.start_server(self.handler__, "", self.port)

    except asyncio.CancelledError:
        self.logger.info("tcpServer__ %i: cancel")
        self.server = None

    except Exception as e:
        self.logger.warning("tcpServer__: failed with '%s'", str(e))
        self.logger.warning("tcpServer__: will try again in %s secs", self.pause)
        await asyncio.sleep(self.pause)
        self.tasks.tcp_server = await reSchedule(self.tcpServer__)

    else:
        self.logger.debug("tcpServer__ : new server waiting")

At a client connection, start a new TCPConnectionThread:

@verbose
async def handler__(self, reader, writer):
    self.logger.debug("handler__ : new connection for %s", self.getInfo())

    if len(self.children)>self.max_clients:
        self.logger.warning("handler__ : max number of connections is %s", self.max_clients)
        return

    child_connection = TCPConnectionThread(reader = reader, writer = writer, parent = self)
    await child_connection.run()
    await self.addChild(child_connection)

Run it!

if __name__ == "__main__":
    loglev = logging.DEBUG

    logger = logging.getLogger("MasterTCPServerThread")
    logger.setLevel(loglev)
    logger = logging.getLogger("TCPConnectionThread")
    logger.setLevel(loglev)

    thread = MasterTCPServerThread(
        parent = None,
        name = "TCPServer"
        )
    loop = asyncio.get_event_loop()
    loop.run_until_complete(thread.run())

While it’s running, start one or several client connections using the provided client code.

4. Data Streaming TCP Server

  • Download example code from [here]

  • Download client side code from [here]

A bit more complex TCP server with intercom and dataframe reconstruction:

  • TCP server starts

  • Client connects to TCP server

  • Server sends a hello json message to the client

  • Client starts streaming byte payload to the server

  • Server reconstructs byte blobs / dataframes from the streaming byte payload

Use the provided client-side code to play around with this.

import asyncio, logging, json
import numpy as np
import traceback
from task_thread import TaskThread, reCreate, reSchedule,\
    delete, verbose, signals

INT_NBYTES = 4

def json2bytes(dic):
    """Turn dic into json bytes & append the bytes with the json bytes length
    """
    bytes_ = json.dumps(dic).encode("utf-8")
    le = len(bytes_)
    bytes_ = le.to_bytes(INT_NBYTES, "big") + bytes_
    return bytes_


class PayloadSignal(signals.Signal):
    """A generic message message signal, carrying a python object
    """
    def __init__(self, payload):
        self.payload = payload

    def __str__(self):
        return "<PayloadSignal with %i bytes>" % (len(self.payload))

    def getData(self):
        return self.payload



class TCPConnectionThread(TaskThread):

    def __init__(self, parent = None, reader = None, writer = None):
        super().__init__(parent = parent)
        self.reader = reader
        self.writer = writer
        self.peername, self.peerport = self.writer.get_extra_info("peername")

    def getId(self):
        return str(id(self))

    def getInfo(self):
        return "<TCPConnectionThread %s connected to %s>" % (self.getId(), self.peername)


    def initVars__(self):
        self.tasks.read_socket = None
        self.tasks.write_socket = None


    @verbose
    async def enter__(self):
        self.logger.info("enter__ : %s", self.getInfo())
        self.tasks.write_socket = await reCreate(self.tasks.write_socket, self.writeSocket__)


    @verbose
    async def exit__(self):
        self.tasks.read_socket = await delete(self.tasks.read_socket)
        self.tasks.write_socket = await delete(self.tasks.write_socket)
        self.logger.debug("exit__: bye!")


    async def readSocket__(self):
        try:
            try:
                packet = await self.reader.read(1500)
                """a word of warning here: try to tread as much bytes as you can with the
                asyncio StreamReader instance (here self.reader) per re-scheduling,
                in order to keep the rescheduling frequency reasonable
                """
            except Exception as e:
                self.logger.warning("readSocket__ : reading failed : '%s'", e)
                self.logger.info("readSocket__: tcp connection %s terminated", self.getInfo())
                await self.stop()
            else:
                if len(packet) > 0:
                    # all good!  keep on reading = reschedule this
                    self.logger.debug("readSocket__: got packet with size %s", len(packet))
                    # send the payload to parent:
                    await self.handlePacket__(packet)
                    """you can re-schedule with a moderate frequency, say, 100 times per second,
                    but try to keep the re-scheduling frequency "sane"
                    """
                    self.tasks.read_socket = await reSchedule(self.readSocket__); return
                else:
                    self.logger.info("readSocket__: client at %s closed connection", self.peername)
                    await self.stop()

        except asyncio.CancelledError:
            self.logger.info("readSocket__ : cancelling %s", self.getInfo())

        except Exception as e:
            self.logger.warning("readSocket__: failed with '%s'", e)
            await self.stop()


    def resetPacketState__(self, clearbuf = False):
        self.left = INT_NBYTES
        self.len = 0
        self.header = True
        if clearbuf:
            self.buf = bytes(0)


    async def handlePacket__(self, packet):
        """packet reconstructions into blobs of certain length
        """
        if packet is not None:
            self.buf += packet
        if self.header:
            if len(self.buf) >= INT_NBYTES:
                self.len = int.from_bytes(self.buf[0:INT_NBYTES], "big")
                self.header = False # we got the header info (length)
                if len(self.buf) > INT_NBYTES:
                    # sort out the remaining stuff
                    await self.handlePacket__(None)
        else:
            if len(self.buf) >= (INT_NBYTES + self.len):
                # correct amount of bytes have been obtained
                payload = np.frombuffer(self.buf[INT_NBYTES:INT_NBYTES + self.len], dtype=np.uint8)
                await self.sigFromChildToParent__(PayloadSignal(
                    payload = payload))
                # prepare state for next blob
                if len(self.buf) > (INT_NBYTES + self.len):
                    # there's some leftover here for the next blob..
                    self.buf = self.buf[INT_NBYTES + self.len:]
                    self.resetPacketState__()
                    # .. so let's handle that leftover
                    await self.handlePacket__(None)
                else:
                    # blob ends exactly
                    self.resetPacketState__(clearbuf=True)


    async def writeSocket__(self):
        """Send one-time info message
        """
        try:
            packet = json2bytes(
                {
                    "message" : "hello!"
                })
            try:
                self.writer.write(packet)
                # await self.writer.drain() # this would maybe flush
            except Exception as e:
                self.logger.warning("writeSocket__ : writing failed : '%s'", e)
                await self.stop()
            else:
                """sending info to the client was succesfull, let's start reading
                payload from the client"""
                self.resetPacketState__(clearbuf = True)
                self.tasks.read_socket = await reCreate(self.tasks.read_socket, self.readSocket__)

        except asyncio.CancelledError:
            self.logger.info("writeSocket__ : cancelling %s", self.getInfo())

        except Exception as e:
            self.logger.warning("writeSocket__: failed with '%s'", e)
            await self.stop()


class MasterTCPServerThread(TaskThread):

    def __init__(self, parent = None, name = "thread", pause = 10, port = 5002, max_clients = 10):
        super().__init__(parent = parent)
        self.pause = pause
        self.port = port
        self.max_clients = max_clients
        self.name = name


    def initVars__(self):
        self.server = None
        self.tasks.tcp_server = None


    def getId(self):
        return str(id(self))


    def getInfo(self):
        return "<MasterTCPServerThread %s>" % (self.name)


    @verbose
    async def enter__(self):
        self.logger.info("entry point")
        self.tasks.tcp_server = await reCreate(self.tasks.tcp_server, self.tcpServer__)
        # now the tasks runs independently


    @verbose
    async def exit__(self):
        self.tasks.tcp_server = await delete(self.tasks.tcp_server)
        self.logger.debug("exit__: bye!")


    @verbose
    async def childsignalHandler__(self, signal, child):
        """How to handle a certain signal from children?
        """
        self.logger.debug("childsignalHandler__ : got signal %s from child %s", signal, child)
        if isinstance(signal, PayloadSignal):
            self.logger.info("Got %s from child with id %s", str(signal), child.getId())
        else:
            pass

    # *** custom rescheduling tasks ***

    async def tcpServer__(self):
        try:
            self.server = await asyncio.start_server(self.handler__, "", self.port)

        except asyncio.CancelledError:
            self.logger.info("tcpServer__ %i: cancel")
            self.server = None

        except Exception as e:
            self.logger.warning("tcpServer__: failed with '%s'", str(e))
            self.logger.warning("tcpServer__: will try again in %s secs", self.pause)
            await asyncio.sleep(self.pause)
            self.tasks.tcp_server = await reSchedule(self.tcpServer__)

        else:
            self.logger.debug("tcpServer__ : new server waiting")


    # *** internal ***

    @verbose
    async def handler__(self, reader, writer):
        self.logger.debug("handler__ : new connection for %s", self.getInfo())

        if len(self.children)>self.max_clients:
            self.logger.warning("handler__ : max number of connections is %s", self.max_clients)
            return

        child_connection = TCPConnectionThread(reader = reader, writer = writer, parent = self)
        await child_connection.run()
        await self.addChild(child_connection)


if __name__ == "__main__":
    # loglev = logging.DEBUG
    loglev = logging.INFO

    logger = logging.getLogger("MasterTCPServerThread")
    logger.setLevel(loglev)
    logger = logging.getLogger("TCPConnectionThread")
    logger.setLevel(loglev)

    thread = MasterTCPServerThread(
        parent = None,
        name = "TCPServer"
        )
    loop = asyncio.get_event_loop()
    loop.run_until_complete(thread.run())