Examples
1. Hello World
Download example code from [here]
The same “hello world” example we already discussed here.
import asyncio, logging, traceback
from task_thread import TaskThread, reCreate, reSchedule,\
delete, verbose, signals
Define TaskThread by subclassing
class MyThread(TaskThread):
Let’s define a rescheduling task that sleeps for a one seconds & then reschedules itself
async def helloTask__(self):
try:
await asyncio.sleep(1)
self.logger.info("Hello from helloTask__ at %s", self.getId())
self.tasks.hello_task = await reSchedule(self.helloTask__)
return
except asyncio.CancelledError:
self.logger.critical("helloTask__: cancelling")
except Exception as e:
self.logger.info("helloTask__: failed with '%s', traceback will follow", e)
traceback.print_exc()
Remember to call the superclass constructor
def __init__(self, my_id = 0, parent = None):
super().__init__(parent = parent)
self.my_id = my_id
Create the tasks under the self.tasks namespace & initialize them to None
def initVars__(self):
# tasks
self.tasks.hello = None # self.helloTask__
The helloTask__
is started for the first time when the thread is started:
@verbose
async def enter__(self):
self.logger.info("enter__ :")
self.tasks.hello = await reCreate(self.tasks.hello, self.helloTask__)
Define what happens at thread exit: delete any running tasks
@verbose
async def exit__(self):
"""Close sockets, databases, etc. Overwrite in child class.
"""
self.tasks.hello = await delete(self.tasks.hello)
self.logger.info("exit__ : finished")
Here we would define how signals from a parent are handled. In this example case nothing.
@verbose
async def signalHandler__(self, signal):
self.logger.info("signalHandler__ : got signal %s", signal)
def getId(self):
return self.my_id
def getInfo(self):
return "<MyThread "+str(self.my_id)+">"
Let’s run MyThread! Terminate by pressing CTRL-C.
if __name__ == "__main__":
loglev = logging.DEBUG
logger = logging.getLogger("MyThread")
logger.setLevel(loglev)
thread = MyThread(my_id = "main_thread", parent = None) # no parent, this is the main thread
loop = asyncio.get_event_loop()
loop.run_until_complete(thread.run())
2. Parent/Child Structure
Download example code from [here]
import asyncio, logging, traceback
from task_thread import TaskThread, reCreate, reSchedule,\
delete, verbose, signals
Let’s define a signal for parent/child communication
class MessageSignal(signals.Signal):
def __init__(self, origin, message):
self.origin = origin
self.message = message
def __str__(self):
return "<MessageSignal from %s>" % (str(self.origin.getId()))
def getMessage(self):
return self.message
We use the same MyThread
as in the previous example.
We will use it as a basis for further subclassing
class MyThread(TaskThread):
# as in previous example
# ...
Let’s create a child thread. This thread sends a message to it’s parent every 5 seconds:
class ChildThread(MyThread):
Define a rescheduling task that sends a message to it’s parent every 5 seconds
async def messageTask__(self):
try:
await asyncio.sleep(5)
self.logger.info("messageTask__: sending a message to parent")
await self.sigFromChildToParent__(
MessageSignal(origin = self,
message = "hello from child " + str(self.getId())))
self.tasks.message_task = await reSchedule(self.messageTask__); return
except asyncio.CancelledError:
self.logger.critical("messageTask__: cancelling")
except Exception as e:
self.logger.info("messageTask__: failed with '%s', traceback will follow", e)
traceback.print_exc()
Add the new task to self.tasks
namespace
def initVars__(self):
# tasks
self.tasks.hello = None # self.helloTask__
self.tasks.message = None # self.messageTask__
# locks
self.locks.message_lock = asyncio.Lock()
Start both tasks at thread start
@verbose
async def enter__(self):
self.logger.info("enter__")
self.tasks.hello = await reCreate(self.tasks.hello, self.helloTask__)
self.tasks.message = await reCreate(self.tasks.message, self.messageTask__)
Kill both tasks at thread exit
@verbose
async def exit__(self):
self.tasks.hello = await delete(self.tasks.hello)
self.tasks.message = await delete(self.tasks.message)
self.logger.info("exit__ : finished")
Now the parent thread that receives signals from the child:
class ParentThread(MyThread):
Start self.helloTask__
just like in the child thread.
The new bit here is, that we create and start a child thread at parent thread start.
After starting, the child thread is added to the parent’s registry using addChild
.
You can experiment by adding more and more child threads.
@verbose
async def enter__(self):
self.logger.info("enter__")
self.tasks.hello = await reCreate(self.tasks.hello, self.helloTask__)
child_thread = ChildThread(my_id = "subthread", parent = self)
await child_thread.run()
await self.addChild(child_thread)
Define how we handle those signals coming from the running child thread:
@verbose
async def childsignalHandler__(self, signal, child):
self.logger.debug("childsignalHandler__ : got signal %s from child %s", signal, child.getId())
if isinstance(signal, MessageSignal):
self.logger.info("Got message %s from child with id %s",
signal.getMessage(), child.getId())
else:
pass
Please note that you can also start child threads “dynamically”, i.e. anywhere in your classes internal methods/recheduling tasks, not just at enter__
.
That’s it! Nothing else is needed: when the parent thread terminates, it automagically terminates all of it’s child threads.
Run the program with:
if __name__ == "__main__":
loglev = logging.DEBUG
logger = logging.getLogger("ParentThread")
logger.setLevel(loglev)
logger = logging.getLogger("ChildThread")
logger.setLevel(loglev)
thread = ParentThread(my_id = "parent_thread", parent = None) # no parent, this is the main thread
loop = asyncio.get_event_loop()
loop.run_until_complete(thread.run())
Terminate by pressing CTRL-C.
3. TCP Server
import asyncio, logging, traceback
from task_thread import TaskThread, reCreate, reSchedule,\
delete, MessageSignal, verbose, signals
Let’s create a TCP server application. The hierarchy looks like this:
MasterTCPServerThread
TCPConnectionThread
TCPConnectionThread
...
As always, start by subclassing TaskThread. First we create the child TCPConnectionThread
.
class TCPConnectionThread(TaskThread):
Constructor takes as an argument stream reader and writer objects:
def __init__(self, parent = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamReader = None):
super().__init__(parent = parent)
assert(reader is not None)
assert(writer is not None)
self.reader = reader
self.writer = writer
self.peername, self.peerport = self.writer.get_extra_info("peername")
def getId(self):
return str(id(self))
def getInfo(self):
return "<TCPConnectionThread %s connected to %s>" % (self.getId(), self.peername)
Re-scheduling tasks for reading and writing the client socket:
def initVars__(self):
self.tasks.read_socket = None
self.tasks.write_socket = None
In this version we just read the socket, so start the corresponding re-scheduling task:
@verbose
async def enter__(self):
self.logger.info("enter__ : %s", self.getInfo())
self.tasks.read_socket = await reCreate(self.tasks.read_socket, self.readSocket__)
@verbose
async def exit__(self):
self.tasks.read_socket = await delete(self.tasks.read_socket)
self.tasks.write_socket = await delete(self.tasks.write_socket)
self.logger.debug("exit__: bye!")
Reads packet from the socket & re-schedules itself.
At failure, kill the whole TCPConnectionThread
. This will inform the parent automagically & remove
this child thread from the parent’s registry.
async def readSocket__(self):
try:
try:
packet = await self.reader.read()
except Exception as e:
self.logger.warning("readSocket__ : reading failed : '%s'", e)
self.logger.info("readSocket__: tcp connection %s terminated", self.getInfo())
await self.stop()
else:
if len(packet) > 0:
# all good! keep on reading = reschedule this
self.logger.info("readSocket__: got packet %s", packet)
self.tasks.read_socket = await reSchedule(self.readSocket__); return
else:
self.logger.info("readSocket__: client at %s closed connection", self.peername)
await self.stop()
except asyncio.CancelledError:
self.logger.info("readSocket__ : cancelling %s", self.getInfo())
except Exception as e:
self.logger.warning("readSocket__: failed with '%s'", e)
await self.stop()
Writing socket continuously (however, not used in this example):
async def writeSocket__(self, packet):
try:
self.writer.write(packet)
# await self.writer.drain() # this would flush
except Exception as e:
self.logger.warning("writeSocket__ : writing failed : '%s'", e)
await self.stop()
Next, create the parent MasterTCPServerThread
:
class MasterTCPServerThread(TaskThread):
def __init__(self, parent = None, name = "thread", pause = 10, port = 5002, max_clients = 10):
super().__init__(parent = parent)
self.pause = pause
self.port = port
self.max_clients = max_clients
self.name = name
def initVars__(self):
self.server = None
self.tasks.tcp_server = None
def getId(self):
return str(id(self))
def getInfo(self):
return "<MasterTCPServerThread %s>" % (self.name)
@verbose
async def enter__(self):
self.logger.info("entry point")
self.tasks.tcp_server = await reCreate(self.tasks.tcp_server, self.tcpServer__)
@verbose
async def exit__(self):
self.tasks.tcp_server = await delete(self.tasks.tcp_server)
self.logger.debug("exit__: bye!")
Each and every child / tcp client sends it’s packets to the parent:
@verbose
async def childsignalHandler__(self, signal, child):
self.logger.debug("childsignalHandler__ : got signal %s from child %s", signal, child)
if isinstance(signal, MessageSignal):
self.logger.info("Got message %s from child with id %s", signal.getMessage(), child.getId())
else:
pass
Keep on (re)creating the tcp server:
async def tcpServer__(self):
try:
self.server = await asyncio.start_server(self.handler__, "", self.port)
except asyncio.CancelledError:
self.logger.info("tcpServer__ %i: cancel")
self.server = None
except Exception as e:
self.logger.warning("tcpServer__: failed with '%s'", str(e))
self.logger.warning("tcpServer__: will try again in %s secs", self.pause)
await asyncio.sleep(self.pause)
self.tasks.tcp_server = await reSchedule(self.tcpServer__)
else:
self.logger.debug("tcpServer__ : new server waiting")
At a client connection, start a new TCPConnectionThread
:
@verbose
async def handler__(self, reader, writer):
self.logger.debug("handler__ : new connection for %s", self.getInfo())
if len(self.children)>self.max_clients:
self.logger.warning("handler__ : max number of connections is %s", self.max_clients)
return
child_connection = TCPConnectionThread(reader = reader, writer = writer, parent = self)
await child_connection.run()
await self.addChild(child_connection)
Run it!
if __name__ == "__main__":
loglev = logging.DEBUG
logger = logging.getLogger("MasterTCPServerThread")
logger.setLevel(loglev)
logger = logging.getLogger("TCPConnectionThread")
logger.setLevel(loglev)
thread = MasterTCPServerThread(
parent = None,
name = "TCPServer"
)
loop = asyncio.get_event_loop()
loop.run_until_complete(thread.run())
While it’s running, start one or several client connections using the provided client code.
4. Data Streaming TCP Server
A bit more complex TCP server with intercom and dataframe reconstruction:
TCP server starts
Client connects to TCP server
Server sends a hello json message to the client
Client starts streaming byte payload to the server
Server reconstructs byte blobs / dataframes from the streaming byte payload
Use the provided client-side code to play around with this.
import asyncio, logging, json
import numpy as np
import traceback
from task_thread import TaskThread, reCreate, reSchedule,\
delete, verbose, signals
INT_NBYTES = 4
def json2bytes(dic):
"""Turn dic into json bytes & append the bytes with the json bytes length
"""
bytes_ = json.dumps(dic).encode("utf-8")
le = len(bytes_)
bytes_ = le.to_bytes(INT_NBYTES, "big") + bytes_
return bytes_
class PayloadSignal(signals.Signal):
"""A generic message message signal, carrying a python object
"""
def __init__(self, payload):
self.payload = payload
def __str__(self):
return "<PayloadSignal with %i bytes>" % (len(self.payload))
def getData(self):
return self.payload
class TCPConnectionThread(TaskThread):
def __init__(self, parent = None, reader = None, writer = None):
super().__init__(parent = parent)
self.reader = reader
self.writer = writer
self.peername, self.peerport = self.writer.get_extra_info("peername")
def getId(self):
return str(id(self))
def getInfo(self):
return "<TCPConnectionThread %s connected to %s>" % (self.getId(), self.peername)
def initVars__(self):
self.tasks.read_socket = None
self.tasks.write_socket = None
@verbose
async def enter__(self):
self.logger.info("enter__ : %s", self.getInfo())
self.tasks.write_socket = await reCreate(self.tasks.write_socket, self.writeSocket__)
@verbose
async def exit__(self):
self.tasks.read_socket = await delete(self.tasks.read_socket)
self.tasks.write_socket = await delete(self.tasks.write_socket)
self.logger.debug("exit__: bye!")
async def readSocket__(self):
try:
try:
packet = await self.reader.read(1500)
"""a word of warning here: try to tread as much bytes as you can with the
asyncio StreamReader instance (here self.reader) per re-scheduling,
in order to keep the rescheduling frequency reasonable
"""
except Exception as e:
self.logger.warning("readSocket__ : reading failed : '%s'", e)
self.logger.info("readSocket__: tcp connection %s terminated", self.getInfo())
await self.stop()
else:
if len(packet) > 0:
# all good! keep on reading = reschedule this
self.logger.debug("readSocket__: got packet with size %s", len(packet))
# send the payload to parent:
await self.handlePacket__(packet)
"""you can re-schedule with a moderate frequency, say, 100 times per second,
but try to keep the re-scheduling frequency "sane"
"""
self.tasks.read_socket = await reSchedule(self.readSocket__); return
else:
self.logger.info("readSocket__: client at %s closed connection", self.peername)
await self.stop()
except asyncio.CancelledError:
self.logger.info("readSocket__ : cancelling %s", self.getInfo())
except Exception as e:
self.logger.warning("readSocket__: failed with '%s'", e)
await self.stop()
def resetPacketState__(self, clearbuf = False):
self.left = INT_NBYTES
self.len = 0
self.header = True
if clearbuf:
self.buf = bytes(0)
async def handlePacket__(self, packet):
"""packet reconstructions into blobs of certain length
"""
if packet is not None:
self.buf += packet
if self.header:
if len(self.buf) >= INT_NBYTES:
self.len = int.from_bytes(self.buf[0:INT_NBYTES], "big")
self.header = False # we got the header info (length)
if len(self.buf) > INT_NBYTES:
# sort out the remaining stuff
await self.handlePacket__(None)
else:
if len(self.buf) >= (INT_NBYTES + self.len):
# correct amount of bytes have been obtained
payload = np.frombuffer(self.buf[INT_NBYTES:INT_NBYTES + self.len], dtype=np.uint8)
await self.sigFromChildToParent__(PayloadSignal(
payload = payload))
# prepare state for next blob
if len(self.buf) > (INT_NBYTES + self.len):
# there's some leftover here for the next blob..
self.buf = self.buf[INT_NBYTES + self.len:]
self.resetPacketState__()
# .. so let's handle that leftover
await self.handlePacket__(None)
else:
# blob ends exactly
self.resetPacketState__(clearbuf=True)
async def writeSocket__(self):
"""Send one-time info message
"""
try:
packet = json2bytes(
{
"message" : "hello!"
})
try:
self.writer.write(packet)
# await self.writer.drain() # this would maybe flush
except Exception as e:
self.logger.warning("writeSocket__ : writing failed : '%s'", e)
await self.stop()
else:
"""sending info to the client was succesfull, let's start reading
payload from the client"""
self.resetPacketState__(clearbuf = True)
self.tasks.read_socket = await reCreate(self.tasks.read_socket, self.readSocket__)
except asyncio.CancelledError:
self.logger.info("writeSocket__ : cancelling %s", self.getInfo())
except Exception as e:
self.logger.warning("writeSocket__: failed with '%s'", e)
await self.stop()
class MasterTCPServerThread(TaskThread):
def __init__(self, parent = None, name = "thread", pause = 10, port = 5002, max_clients = 10):
super().__init__(parent = parent)
self.pause = pause
self.port = port
self.max_clients = max_clients
self.name = name
def initVars__(self):
self.server = None
self.tasks.tcp_server = None
def getId(self):
return str(id(self))
def getInfo(self):
return "<MasterTCPServerThread %s>" % (self.name)
@verbose
async def enter__(self):
self.logger.info("entry point")
self.tasks.tcp_server = await reCreate(self.tasks.tcp_server, self.tcpServer__)
# now the tasks runs independently
@verbose
async def exit__(self):
self.tasks.tcp_server = await delete(self.tasks.tcp_server)
self.logger.debug("exit__: bye!")
@verbose
async def childsignalHandler__(self, signal, child):
"""How to handle a certain signal from children?
"""
self.logger.debug("childsignalHandler__ : got signal %s from child %s", signal, child)
if isinstance(signal, PayloadSignal):
self.logger.info("Got %s from child with id %s", str(signal), child.getId())
else:
pass
# *** custom rescheduling tasks ***
async def tcpServer__(self):
try:
self.server = await asyncio.start_server(self.handler__, "", self.port)
except asyncio.CancelledError:
self.logger.info("tcpServer__ %i: cancel")
self.server = None
except Exception as e:
self.logger.warning("tcpServer__: failed with '%s'", str(e))
self.logger.warning("tcpServer__: will try again in %s secs", self.pause)
await asyncio.sleep(self.pause)
self.tasks.tcp_server = await reSchedule(self.tcpServer__)
else:
self.logger.debug("tcpServer__ : new server waiting")
# *** internal ***
@verbose
async def handler__(self, reader, writer):
self.logger.debug("handler__ : new connection for %s", self.getInfo())
if len(self.children)>self.max_clients:
self.logger.warning("handler__ : max number of connections is %s", self.max_clients)
return
child_connection = TCPConnectionThread(reader = reader, writer = writer, parent = self)
await child_connection.run()
await self.addChild(child_connection)
if __name__ == "__main__":
# loglev = logging.DEBUG
loglev = logging.INFO
logger = logging.getLogger("MasterTCPServerThread")
logger.setLevel(loglev)
logger = logging.getLogger("TCPConnectionThread")
logger.setLevel(loglev)
thread = MasterTCPServerThread(
parent = None,
name = "TCPServer"
)
loop = asyncio.get_event_loop()
loop.run_until_complete(thread.run())