Lesson 4 : Receiving Frames at Python

Here we start with two separate python programs: (1) a server that reads RTSP cameras and writes RGB frames into shared memory and (2) a client that reads those RGB frames from memory. For the client program, two versions are provided, the API level 2 being the most compact one.

Such scheme is only for demo/tutorial purposes. Normally you would start both the server and client from within the same python program. We give an example of that as well.

Server side

Download server side [here]

By now, we have learned how to receive, decode and send streams to the x window system. In this chapter, we do all that, but at the same time, also send copies of the decoded frames to another python process.

The filtergraph looks like this:

(LiveThread:livethread) -------------------------------------+  main branch, streaming
                                                             |
{ForkFrameFilter: fork_filter} <----(AVThread:avthread) << --+  main branch, decoding
               |
      branch 1 +->> (OpenGLThread:glthread)
               |
      branch 2 +--> {IntervalFrameFilter: interval_filter} --> {SwScaleFrameFilter: sws_filter} --> {RGBSharedMemFrameFilter: shmem_filter}

We are using the ForkFrameFilter to branch the decoded stream into two branches. Branch 1 goes to screen, while branch 2 does a lot of new stuff.

In branch 2, IntervalFrameFilter passes a frame through on regular intervals. In our case we are going to use an interval of 1 second, i.e. even if your camera is sending 25 fps, at the other side of IntervalFrameFilter we’ll be observing only 1 fps.

SwScaleFrameFilter does YUV => RGB interpolation on the CPU. The final, interpolated RGB frame is passed to the posix shared memory with the RGBSharedMemFrameFilter. From there it can be read by another python process.

(Remember that branch 1 does YUV => RGB interpolation as well, but on the GPU (and at 25 fps rate))

To summarize, branch 1 interpolates once a second a frame to RGB and passes it to shared memory. The size of the frame can be adjusted.

Let’s start the construction of the filtergraph by defining some parameters. Frames are passed to SwScaleFrameFilter at 1000 millisecond intervals. The image dimensions of the frame passed into shared memory, will be one quarter of a full-hd frame:

# define yuv=>rgb interpolation interval
image_interval=1000  # YUV => RGB interpolation to the small size is done each 1000 milliseconds and passed on to the shmem ringbuffer

# define rgb image dimensions
width  =1920//4
height =1080//4

RGBSharedMemFrameFilter needs also a unique name and the size of the shared memory ring-buffer:

# posix shared memory
shmem_name    ="lesson_4"      # This identifies posix shared memory - must be unique
shmem_buffers =10              # Size of the shmem ringbuffer

Next, we construct the filterchain as usual, from end-to-beginning:

# branch 1
glthread        =OpenGLThread("glthread")
gl_in_filter    =glthread.getFrameFilter()

# branch 2
shmem_filter    =RGBShmemFrameFilter(shmem_name, shmem_buffers, width, height)
# shmem_filter    =BriefInfoFrameFilter("shmem") # a nice way for debugging to see of you are actually getting any frames here ..
sws_filter      =SwScaleFrameFilter("sws_filter", width, height, shmem_filter)
interval_filter =TimeIntervalFrameFilter("interval_filter", image_interval, sws_filter)

# fork
fork_filter     =ForkFrameFilter("fork_filter", gl_in_filter, interval_filter)

# main branch
avthread        =AVThread("avthread",fork_filter)
av_in_filter    =avthread.getFrameFilter()
livethread      =LiveThread("livethread")

Define connection to camera: frames from 192.168.1.41 are written to live_out_filter and tagged with slot number 1:

# ctx =LiveConnectionContext(LiveConnectionType_rtsp, "rtsp://admin:nordic12345@192.168.1.41", 1, av_in_filter)
ctx =LiveConnectionContext(LiveConnectionType_rtsp, "rtsp://admin:123456@192.168.0.134", 1, av_in_filter)

Start processes, stream for 60 seconds and exit:

glthread.startCall()
avthread.startCall()
livethread.startCall()

# start decoding
avthread.decodingOnCall()

livethread.registerStreamCall(ctx)
livethread.playStreamCall(ctx)

# create an X-window
window_id =glthread.createWindow()
glthread.newRenderGroupCall(window_id)

# maps stream with slot 1 to window "window_id"
context_id=glthread.newRenderContextCall(1,window_id,0)

time.sleep(60)

glthread.delRenderContextCall(context_id)
glthread.delRenderGroupCall(window_id)

# stop decoding
avthread.decodingOffCall()

# stop threads
livethread.stopCall()
avthread.stopCall()
glthread.stopCall()

print("bye")

Note

In the previous lessons, all streaming has taken place at the cpp level. Here we are starting to use posix shared memory and semaphores in order to share frames between python processes, with the ultimate goal to share them with machine vision processes. However, if you need very high-resolution and high fps solutions, you might want to implement the sharing of frames and your machine vision routines directly at the cpp level.

Client side: API level 2

Download client side API level 2 [here]

This is a separate python program for reading the frames that are written by Valkka to the shared memory.

The parameters used both in the server side (above) and on the client side (below) must be exactly the same and the client program should be started after the server program (and while the server is running). Otherwise undefined behaviour will occur.

The used shmem_name(s) should be same in both server and client, but different for another server/client pair.

from valkka.api2 import ShmemRGBClient

width = 1920//4
height = 1080//4

# This identifies posix shared memory - must be same as in the server side
shmem_name = "lesson_4"
shmem_buffers = 10              # Size of the shmem ringbuffer

client = ShmemRGBClient(
    name=shmem_name,
    n_ringbuffer=shmem_buffers,
    width=width,
    height=height,
    mstimeout=1000,        # client timeouts if nothing has been received in 1000 milliseconds
    verbose=False
)

The mstimeout defines the semaphore timeout in milliseconds, i.e. the time when the client returns even if no frame was received:

while True:
    index, meta = client.pullFrame()
    if (index == None):
        print("timeout")
    else:
        data = client.shmem_list[index][0:meta.size]
        data = data.reshape((meta.height, meta.width, 3))
        print("got data: ", data.shape)

The client.shmem_list is a list of numpy arrays, while isize defines the extent of data in the array. This example simply prints out the first ten bytes of the RGB image.

Client side: openCV

Download client side openCV example [here]

OpenCV is a popular machine vision library. We modify the previous example to make it work with openCV like this:

import cv2
from valkka.api2 import ShmemRGBClient

width = 1920//4
height = 1080//4

# This identifies posix shared memory - must be same as in the server side
shmem_name = "lesson_4"
shmem_buffers = 10              # Size of the shmem ringbuffer

client = ShmemRGBClient(
    name=shmem_name,
    n_ringbuffer=shmem_buffers,
    width=width,
    height=height,
    mstimeout=1000,        # client timeouts if nothing has been received in 1000 milliseconds
    verbose=False
)

while True:
    index, meta = client.pullFrame()
    if (index == None):
        print("timeout")
        continue
    data = client.shmem_list[index][0:meta.size]
    print("data   : ", data[0:min(10, meta.size)])
    print("width  : ", meta.width)
    print("height : ", meta.height)
    print("slot   : ", meta.slot)
    print("time   : ", meta.mstimestamp)
    img = data.reshape((meta.height, meta.width, 3))
    # img2 =imutils.resize(img, width=500)
    img = cv2.GaussianBlur(img, (21, 21), 0)
    print("got frame", img.shape)
    ## depending on how you have installed your openCV, this might not work:
    ## (use at your own risk)
    #cv2.imshow("valkka_opencv_demo", img)
    #cv2.waitKey(1)

After receiving the RGB frame, some gaussian blur is applied to the image. Then it is visualized using openCV’s own “high-gui” infrastructure. If everything went ok, you should see a blurred image of your video once in a second.

Start this script after starting the server side script (server side must also be running).

Client side: API level 1

Download client side example [here]

API level 2 provides extra wrapping. Let’s see what goes on at the lowest level (plain, cpp wrapped python code).

from valkka.core import *

width = 1920//4
height = 1080//4

shmem_name = "lesson_4"  # This identifies posix shared memory - must be unique
shmem_buffers = 10       # Size of the shmem ringbuffer

The wrapped cpp class is SharedMemRingBufferRGB (at the server side, RGBShmemFrameFilter is using SharedMemRingBufferRGB):

shmem = SharedMemRingBufferRGB(shmem_name, shmem_buffers, width, height,
                               1000, False)  # shmem id, buffers, w, h, timeout, False=this is a client

Next, get handles to the shared memory as numpy arrays:

shmem_list = []
for i in range(shmem_buffers):
    # getNumpyShmem defined in the swig interface file
    shmem_list.append(getNumpyShmem(shmem, i))
    print("got element i=", i)

Finally, start reading frames.

shmem.clientPullPy() returns a tuple with the shared memory ringbuffer index and metadata.

while True:
    tup = shmem.clientPullPy()
    index = tup[0]
    if index < 0:
        print("timeout")
        continue
    isize         = tup[1]
    width         = tup[2]
    height        = tup[3]
    slot          = tup[4]
    mstimestamp   = tup[5]
    print("got index, size =", index, isize)
    ar = shmem_list[index][0:isize]  # this is just a numpy array
    ar = ar.reshape((height, width, 3)) # this is your rgb image
    print("payload         =", ar.shape)

Cpp documentation for Valkka shared memory classes be found here.

Server + Client

Download server + client example [here]

Here we have a complete example running both server & client within the same python file.

You could wrap the client part further into a python thread, releasing your main python process to, say, run a GUI.

Yet another possibility is to run the server and client in separate multiprocesses. In this case one must be extra carefull to spawn the multiprocesses before instantiating any libValkka objects, since libValkka relies heavily on multithreading (this is the well-known “fork must go before threading” problem).

These problems have been addressed/resolved more deeply in the valkka-live video surveillance client.

But let’s turn back to the complete server + client example

import time
from valkka.core import *
from valkka.api2 import ShmemRGBClient

The filtergraph, once again:

(LiveThread:livethread) -------------------------------------+  main branch, streaming
                                                             |
{ForkFrameFilter: fork_filter} <----(AVThread:avthread) << --+  main branch, decoding
               |
      branch 1 +->> (OpenGLThread:glthread)
               |
      branch 2 +--> {IntervalFrameFilter: interval_filter} --> {SwScaleFrameFilter: sws_filter} --> {RGBSharedMemFrameFilter: shmem_filter}
# define yuv=>rgb interpolation interval
image_interval=1000  # YUV => RGB interpolation to the small size is done each 1000 milliseconds and passed on to the shmem ringbuffer

# define rgb image dimensions
width  =1920//4
height =1080//4

RGBSharedMemFrameFilter needs unique name and the size of the shared memory ring-buffer:

# posix shared memory
shmem_name    ="lesson_4"      # This identifies posix shared memory - must be unique
shmem_buffers =10              # Size of the shmem ringbuffer

Next, we construct the filterchain as usual, from end-to-beginning:

# branch 1
glthread        =OpenGLThread("glthread")
gl_in_filter    =glthread.getFrameFilter()

# branch 2
shmem_filter    =RGBShmemFrameFilter(shmem_name, shmem_buffers, width, height)
# shmem_filter    =BriefInfoFrameFilter("shmem") # a nice way for debugging to see of you are actually getting any frames here ..
sws_filter      =SwScaleFrameFilter("sws_filter", width, height, shmem_filter)
interval_filter =TimeIntervalFrameFilter("interval_filter", image_interval, sws_filter)

# fork
fork_filter     =ForkFrameFilter("fork_filter", gl_in_filter, interval_filter)

# main branch
avthread        =AVThread("avthread",fork_filter)
av_in_filter    =avthread.getFrameFilter()
livethread      =LiveThread("livethread")

Define connection to camera: frames from the IP camera are written to live_out_filter and tagged with slot number 1:

# ctx =LiveConnectionContext(LiveConnectionType_rtsp, "rtsp://admin:nordic12345@192.168.1.41", 1, av_in_filter)
ctx =LiveConnectionContext(LiveConnectionType_rtsp, "rtsp://admin:123456@192.168.0.134", 1, av_in_filter)

Start threads:

glthread.startCall()
avthread.startCall()
livethread.startCall()

# start decoding
avthread.decodingOnCall()

livethread.registerStreamCall(ctx)

# create an X-window
window_id =glthread.createWindow()
glthread.newRenderGroupCall(window_id)

# maps stream with slot 1 to window "window_id"
context_id=glthread.newRenderContextCall(1,window_id,0)

Ok, the server is alive and running. Let’s do the client part for receiving frames.

client = ShmemRGBClient(
    name=shmem_name,
    n_ringbuffer=shmem_buffers,
    width=width,
    height=height,
    mstimeout=1000,        # client timeouts if nothing has been received in 1000 milliseconds
    verbose=False
)

The client is ready to go. Before starting to receive frames, start playing the RTSP camera

livethread.playStreamCall(ctx)

Read 10 frames & exit

print("client starting")
cc = 0
while True:
    index, meta = client.pullFrame()
    if (index == None):
        print("timeout")
    else:
        data = client.shmem_list[index][0:meta.size]
        data = data.reshape((meta.height, meta.width, 3))
        print("got data: ", data.shape)
        cc += 1
    if cc >= 10:
        break

print("stopping..")

Clear the server

glthread.delRenderContextCall(context_id)
glthread.delRenderGroupCall(window_id)

# stop decoding
avthread.decodingOffCall()

# stop threads
livethread.stopCall()
avthread.stopCall()
glthread.stopCall()

time.sleep(1)

print("bye")

Receiving frag-MP4 at Python

Download frag-MP4 example [here]

Fragmented MP4 (frag-MP4) is a container format suitable for live streaming and playing the video in most web browsers. For more information about this, see here.

With libValkka you can mux your IP camera’s H264 stream on-the-fly into frag-MP4 and then push it into cloud, using Python3 only.

This is similar what we have just done for the RGB bitmap frames. Now, instead of RGB24 frames, we receive frag-MP4 to the python side.

And, of course, we could do all the following things simultaneously: decode, show on screen, push RGB24 frames for video analysis, push frag-MP4 to your browser, etc. However, for clarity, here we just show the video on screen & receive frag-MP4 frames in our python process.

import time
from valkka.core import *
from valkka.api2 import FragMP4ShmemClient

The filtergraph for simultaneous video viewing and frag-MP4 muxing looks like this:

(LiveThread:livethread) -->----------------------------------+  main branch (forks into two)
                                                             |
(OpenGLThread:glthread) <----(AVThread:avthread) << ---------+  decoding branch
                                                             |
     +-----------------------------------------<-------------+  mux branch
     |
     +--> {FragMP4MuxFrameFilter:fragmp4muxer} --> {FragMP4ShmemFrameFilter:fragmp4shmem}
shmem_buffers = 10 # 10 element in the ring-buffer
shmem_name = "lesson_4_c" # unique name identifying the shared memory
cellsize = 1024*1024*3 # max size for each MP4 fragment
timeout = 1000 # in ms

# decoding branch
glthread        =OpenGLThread("glthread")
gl_in_filter    =glthread.getFrameFilter()
avthread        =AVThread("avthread",gl_in_filter)
av_in_filter    =avthread.getFrameFilter()

# mux branch
shmem_filter    =FragMP4ShmemFrameFilter(shmem_name, shmem_buffers, cellsize)
mux_filter      =FragMP4MuxFrameFilter("fragmp4muxer", shmem_filter)
mux_filter.activate() # don't forget!

# fork
fork_filter     =ForkFrameFilter("fork_filter", av_in_filter, mux_filter)

# main branch
livethread      =LiveThread("livethread")

Define connection to camera: frames from the IP camera are written to live_out_filter and tagged with slot number 1:

# ctx =LiveConnectionContext(LiveConnectionType_rtsp, "rtsp://admin:nordic12345@192.168.1.41", 1, fork_filter)
ctx =LiveConnectionContext(LiveConnectionType_rtsp, "rtsp://admin:123456@192.168.0.134", 1, fork_filter)

Start threads:

glthread.startCall()
avthread.startCall()
livethread.startCall()

# start decoding
avthread.decodingOnCall()

livethread.registerStreamCall(ctx)

# create an X-window
window_id =glthread.createWindow()
glthread.newRenderGroupCall(window_id)

# maps stream with slot 1 to window "window_id"
context_id=glthread.newRenderContextCall(1,window_id,0)

Ok, the server is alive and running. Let’s do the client part for receiving frames.

client = FragMP4ShmemClient(
    name=shmem_name,
    n_ringbuffer=shmem_buffers,
    n_size=cellsize,
    mstimeout=timeout,
    verbose=False
)

The client is ready to go. Before starting to receive frames, start playing the RTSP camera

livethread.playStreamCall(ctx)

Read 10 frames & exit

print("client starting")
cc = 0
while True:
    index, meta = client.pullFrame()
    if (index == None):
        print("timeout")
    else:
        data = client.shmem_list[index][0:meta.size]
        print("got", meta.name.decode("utf-8"), "of size", meta.size)
        cc += 1
    if cc >= 100:
        break

print("stopping..")

mux_filter.deActivate() # don't forget!

Clear the server

glthread.delRenderContextCall(context_id)
glthread.delRenderGroupCall(window_id)

# stop decoding
avthread.decodingOffCall()

# stop threads
livethread.stopCall()
avthread.stopCall()
glthread.stopCall()

print("bye")

Multiprocessing

By now you have learned the basics of passing frames from the libValkka infrastructure into python.

When creating more serious solutions, you typically create several python multiprocesses in a single python program, using python multiprocessing and the valkka-multiprocess module.

In these cases you must remember to span all multiprocesses in the very beginning of your code and then arrange an interprocess communication between them, so that the multiprocesses will instantiate the server and client in the correct order.

You can also create shared memory servers, where you can feed frames from the python side (vs. at the cpp side)

LibValkka shared memory server and client also features a posix file-descriptor API. It is convenient in cases, where a single process is listening simultaneously to several shared memory servers, and you want to do the i/o efficiently: you can use python’s “select” module to do efficient “multiplexing” of pulling frames from several shmem clients.

Here are some possibilities:

  1. Several shared memory servers, each one sending video from one camera.

  2. Several client processes, each one receiving video from a shared memory server. Each client process establishes it’s own shared memory server for further sharing of the frames.

  3. A master process that listens to multiple clients at the same time.

So (1) works at the cpp side, then, for example (2) is a separate multiprocess running OpenCV-based analysis and (3) is a common Yolo object detector for all the clients.

That was a lot to take in! Fortunately we provide simplified examples of all the topics mentioned above: please see the multiprocessing examples.

For full-blown programs (however, these are maybe too complex and not always up-to-date), please see valkka-live and valkka-streamer.