Threads

Threads are the active components of a pipeline. The base class Limef::thread::Thread is templated on the frame type it handles specially. Each thread exposes Limef::thread::Thread::getInput() and Limef::thread::Thread::getOutput() as connection points.

namespace thread

multithreading

Non-template base class for all thread classes.

Defines the common interface for all thread classes. Used by ComposeThread to manage thread chains without needing to know the specific template types.

Each thread:

  • Has input and output framefilters for connecting to other threads

  • Can be started and stopped

  • Can be bound to specific CPU cores

  • Has configurable logging

class ComposeThread : public Limef::thread::ThreadBase
#include <compose.h>

Base class for composite threads.

Allows creating complex thread combinations by composing simpler ones. Subclasses create and chain their threads directly in the constructor call to chainThreads().

Example usage:

class StreamThread : public ComposeThread {
public:
    StreamThread(const std::string& name, const StreamContext& ctx) 
        : ComposeThread(name) {
        
        chainThreads({
            new LiveStreamThread("live", ctx),
            new PacketBufferThread("buffer", ctx),
            new DecodingThread("decode", ctx)
        });
    }
};

ComposeThread takes ownership of the created threads and handles:

  • Chaining outputs to inputs

  • Starting/stopping in correct order

  • Memory cleanup

  • Logger propagation

  • CPU affinity assignment

IMPORTANT: when calling start for a ComposeThread, the threads are started in reverse order (in the current example: DecodingThread first, then PacketBufferThread, etc.) This assures that downstream receiving threads are ready & do not miss any frames sent by the upstream threads.

Subclassed by Limef::thread::MediaFileThread

Public Functions

virtual void start() override

Start the thread.

Initializes internal structures and starts the thread’s main loop. Thread subclasses may perform additional initialization.

virtual bool isStarted() const override

Check if start has been called or not.

virtual void requestStop() override

Request thread termination.

Sends termination signal to the thread. Does not wait for completion.

virtual void waitStop() override

Wait for thread termination.

Blocks until the thread has completely stopped.

virtual void stop() override

Stop the thread.

Combines requestStop() and waitStop() for convenience.

virtual void setAffinity(int core_id) override

Bind thread to specific CPU core.

Parameters:

core_id – CPU core number to bind to

virtual void setLogLevel(spdlog::level::level_enum level) override

Set thread’s log level.

virtual void setLogFormat(const std::string &format) override

Set thread’s log format.

virtual void shortLogFormat() override

Set a nice minimal format for thread’s logger.

virtual spdlog::level::level_enum getLogLevel() const override

Get thread’s current log level.

virtual Limef::ff::FrameFilter &getInput() override

Get thread’s input framefilter.

Returns:

Reference to input framefilter

virtual Limef::ff::FrameFilter &getOutput() override

Get thread’s output framefilter.

Returns:

Reference to output framefilter

class ConsumerThread : public Limef::thread::Thread<Limef::frame::PacketFrame>
#include <consumer.h>

An example consumer thread.

An example thread that consumes frames Consumes PacketFrames at a certain framerate from the input framefilter. Does not pass anything downstream to output framefilter.

class ExamplePollThread : public Limef::thread::PollThread<Limef::frame::PacketFrame>
#include <pollthread.h>

Concrete example of PollThread for testing.

Demonstrates PollThread with PacketFrames and a periodic task.

class ExampleSignal : public Limef::signal::Signal
#include <examplethread.h>

An example signal for ExampleThread

An example custom signal class for the ExampleThread thread class

class ExampleThread : public Limef::thread::Thread<Limef::frame::PacketFrame>
#include <examplethread.h>

An example thread.

An example thread with custom signals, task and frame processing. Receives PacketFrames from upstream (input framefilter) and simply passes them downstream (output framefilter). Executes a periodic task (every 2 secs). Demonstrates intercommunication between thread front- and backend.

Public Functions

void pingCall()

Frontend method that calls backend method ExampleThread::pingCall__

void pingCall2(int param)

Frontend method that calls backend method ExampleThread::pingCall2__

struct LiveStreamContext
#include <livestreamcontext.h>

Configuration context for live stream reader.

Public Members

const std::string url = {""}

Stream URL (e.g., “rtsp://camera.example.com/stream”)

const Slot slot = {1}

Slot identifying this stream.

int timeout_sec = {5}

Read timeout in seconds.

std::string rtsp_transport = {"tcp"}

RTSP transport (tcp/udp) for RTSP sources.

int max_delay_ms = {500}

Maximum allowed network delay for RTSP sources.

int reopen_retry_max = {-1}

Maximum number of reopen attempts: -1: infinite, 0: none, etc.

int timeout_start_sec = {5}

Start value for retry timeout.

int timeout_max_sec = {60}

Max value for retry timeout.

int factor = {2}

Factor to increase retry timeouts.

class LiveStreamThread : public Limef::thread::Thread<Limef::frame::NullFrame>
#include <livestream.h>

Thread for reading live media streams using FFmpeg.

Handles live media sources like RTSP cameras using FFmpeg’s interrupt callback system for proper timeout handling. Sends CodecFrame on startup followed by PacketFrames containing media data.

Example protocols:

  • RTSP (rtsp://)

  • HTTP Live Streaming (http://)

  • UDP/RTP streams (udp://)

Public Functions

LiveStreamThread(std::string name, const LiveStreamContext &stream_ctx)

Construct a new Live Stream Reader Thread.

Parameters:
  • name – Thread name for identification

  • stream_ctx – LiveStreamContext defining the live stream

Throws:

std::runtime_error – if stream cannot be opened or initialized

struct MediaFileContext
#include <mediafile0.h>

Public Members

const std::string filename = {""}

Path to media file.

const Slot slot = {1}

Slot identifying this stream.

int fps = {0}

Target fps for feeding frames. 0 = feed frames at infinite speed. -1 = feed frames at play speed.

int loop = {-1}

Loop the file from start at EOF. -1 = no looping, otherwise, the pause in milliseconds before looping.

class MediaFileSignal : public Limef::signal::Signal
#include <mediafile0.h>

Specific signals to MediaFileThread0.

class MediaFileThread : public Limef::thread::ComposeThread
#include <mediafile.h>

A safe media playback thread.

Creates a chain of:

  1. MediaFileThread1 for reading files and transcoding / checking them to some standard form

  2. OrderedPacketBufferThread for DTS ordering and buffering

NOTE: MediaFileThread1 includes conversion to AAC

NOTE: thread starting order will be (2) -> (1)

                thread border
MediaFileThread1 ->| OrderedPacketBufferThread

class MediaFileThread0 : public Limef::thread::ProducerThread
#include <mediafile0.h>

Thread for reading media files using FFmpeg.

Reads packets from a media file and sends them downstream:

  1. On startup, sends CodecFrame with stream information

  2. Continuously sends PacketFrames with media data

Uses ProducerThread’s timing system to control packet output rate

Dumps media downstream as-is - no internal conversions

Subclassed by Limef::thread::MediaFileThread1

Public Functions

MediaFileThread0(std::string name, const MediaFileContext &ctx)

Construct a new Media File Reader Thread.

Parameters:
  • name – Thread name for identification

  • ctx – MediaFileContext identifying the stream

Throws:

std::runtime_error – if file cannot be opened or stream info cannot be read

virtual void handleSignalFrame(const Limef::frame::SignalFrame *signalFrame) override

Handles custom signals as per MediaFileSignal

void resendCodecFrame()

frontend-method: tell thread to send CodecFrame again downstream

class MediaFileThread1 : public Limef::thread::MediaFileThread0
#include <mediafile1.h>

MediaFileThread1.

Extends MediaFileThread0 to do some encoding and codec checks in order to ensure a “canonical” / standard stream for our framework At the moment, consists of the following:

  • All incoming audio is encoded into AAC

  • TODO: video codec checks?

Public Functions

inline virtual Limef::ff::FrameFilter &getOutput() override

Get thread’s output framefilter.

Returns:

Reference to output framefilter

inline virtual void setLogLevel(spdlog::level::level_enum level) override

Set thread’s log level.

inline virtual void setLogFormat(const std::string &format) override

Set thread’s log format.

class OrderedPacketBufferThread : public Limef::thread::PacketBufferThread
#include <orderedpacketbuffer.h>

Thread for buffering PacketFrames with DTS ordering.

Extends PacketBufferThread to use OrderedFrameFifo, which maintains video packets in DTS order. Otherwise methods identical to PacketBufferThread.

class PacketBufferThread : public Limef::thread::Thread<Limef::frame::PacketFrame>
#include <packetbuffer.h>

Thread for buffering PacketFrames.

This thread immediately terminates the filter chain for PacketFrames by:

  1. Making a copy of incoming PacketFrames using framefifo’s stack

  2. Placing the copy into framefifo’s queue

  3. Processing frames from the queue in its own thread context

Subclassed by Limef::thread::OrderedPacketBufferThread

Public Functions

PacketBufferThread(std::string name, const Limef::FrameFifoContext &ctx)

Construct a new Packet Buffer Thread.

Parameters:
  • name – Thread name for identification

  • ctx – FrameFifoContext defining the framefifo parameters

struct PerSocketData
#include <websocket.h>
template<typename T>
class PollThread : public Limef::thread::Thread<T>
#include <pollthread.h>

Example thread demonstrating PollFrameFifo usage.

This thread uses PollFrameFifo instead of the regular FrameFifo, allowing the main loop to multiplex frame reading with other I/O operations (sockets, pipes, etc.) using poll().

Key differences from regular Thread:

  • Overrides createFrameFifo() to create PollFrameFifo

  • Overrides mainLoop() to use poll() instead of blocking read()

  • Provides getFifoFd() for subclasses to include in their poll set

Usage pattern for subclasses (e.g., RTSPServerThread):

void MyServerThread::mainLoop() {
    preRun();
    sendSignal__(BasicSignal(BasicSignal::Operation::started));

    std::vector<pollfd> fds;
    fds.push_back({listen_fd_, POLLIN, 0});
    fds.push_back({getFifoFd(), POLLIN, 0});

    while (!stop_requested_) {
        int timeout_ms = computeTaskTimeout();
        poll(fds.data(), fds.size(), timeout_ms);

        // Handle incoming frames
        if (fds[1].revents & POLLIN) {
            while (auto* frame = pollFrameFifo()->tryRead()) {
                frameTask(frame);
            }
        }

        // Handle socket I/O
        if (fds[0].revents & POLLIN) {
            acceptClient();
        }

        task();
    }
    postRun();
}

class ProducerThread : public Limef::thread::Thread<Limef::frame::NullFrame>
#include <producer.h>

A prototype frame producing thread (aka source)

An example and base class for Threads that produce frames, i.e. for frame sources. This example implementation produces and sends NullFrames to it’s output framefilter.

Subclassed by Limef::thread::MediaFileThread0, Limef::thread::USBCameraThread

template<typename T>
class Thread : public Limef::thread::ThreadBase
#include <thread.h>

A multithreading base class.

Thread class includes:

  • thread “frontend” methods: you are supposed to call these from the main thread.

  • thread “backend” methods: these are used internally by the thread after it has been spawned.

  • a single, unified fifo for handling incoming Frames and commands, the FrameFifo.

Subclassed by Limef::thread::PollThread< Limef::frame::RTPPacketFrame >, Limef::thread::PollThread< Limef::frame::PacketFrame >, Limef::thread::PollThread< T >

Public Functions

Thread(std::string name, const Limef::FrameFifoContext &ctx)

Default ctor

Parameters:
  • name – a unique name identifying this thread

  • ctx – a FrameFifoContext defining the framefifo

virtual void start() override

sets up thread and starts mainLoop

inline virtual bool isStarted() const override

Check if start has been called or not.

virtual void requestStop() override

has the thread been started or not?

request thread termination

virtual void waitStop() override

wait for thread termination

virtual void stop() override

combined requestStop() and waitStop()

virtual void setAffinity(int core_id) override

Binds the thread to a certain CPU.

std::unique_ptr<Limef::signal::Signal> read(std::chrono::microseconds timeout = 0us)

read a signal at the frontend. 0 means wait forever -> uses fifo_mutex_ & fifo_condition_

Read a signal at the thread frontend (main python thread) sent by the thread backend through an internal fifo. Example:

auto signal2 = read();
if (auto* basicSignal = signal2->as<SomeSignalClass>()) {
}

inline virtual Limef::ff::FrameFilter &getInput() override

returns thread’s input FrameFilter

inline virtual Limef::ff::FrameFilter &getOutput() override

returns thread’s output FrameFilter

virtual void setLogLevel(spdlog::level::level_enum level) override

Sets thread logger’s loglevel.

virtual void setLogFormat(const std::string &format) override

Sets thread logger’s format.

virtual void shortLogFormat() override

Shortcut method for setting a nice minimal format for thread’s logger.

virtual spdlog::level::level_enum getLogLevel() const override

Get thread logger’s current log level.

class ThreadBase
#include <threadbase.h>

Subclassed by Limef::thread::Thread< Limef::frame::RTPPacketFrame >, Limef::thread::Thread< Limef::frame::PacketFrame >, Limef::thread::Thread< Limef::frame::NullFrame >, Limef::thread::Thread< Limef::frame::RawFrame >, Limef::thread::ComposeThread, Limef::thread::Thread< T >

Public Functions

virtual void start() = 0

Start the thread.

Initializes internal structures and starts the thread’s main loop. Thread subclasses may perform additional initialization.

virtual bool isStarted() const = 0

Check if start has been called or not.

virtual void requestStop() = 0

Request thread termination.

Sends termination signal to the thread. Does not wait for completion.

virtual void waitStop() = 0

Wait for thread termination.

Blocks until the thread has completely stopped.

virtual void stop() = 0

Stop the thread.

Combines requestStop() and waitStop() for convenience.

virtual void setAffinity(int core_id) = 0

Bind thread to specific CPU core.

Parameters:

core_id – CPU core number to bind to

virtual void setLogLevel(spdlog::level::level_enum level) = 0

Set thread’s log level.

virtual void setLogFormat(const std::string &format) = 0

Set thread’s log format.

virtual void shortLogFormat() = 0

Set a nice minimal format for thread’s logger.

virtual spdlog::level::level_enum getLogLevel() const = 0

Get thread’s current log level.

virtual Limef::ff::FrameFilter &getInput() = 0

Get thread’s input framefilter.

Returns:

Reference to input framefilter

virtual Limef::ff::FrameFilter &getOutput() = 0

Get thread’s output framefilter.

Returns:

Reference to output framefilter

struct USBCameraContext
#include <usbcamera.h>

Configuration for USB camera capture.

Public Members

std::string device = {"/dev/video0"}

V4L2 device path.

Slot slot = {1}

Slot identifying this stream.

int width = {640}

Requested capture width.

int height = {480}

Requested capture height.

int fps = {30}

Requested frame rate.

AVPixelFormat output_format = {AV_PIX_FMT_NV12}

Output format after SwScale (NV12 for GPU)

class USBCameraThread : public Limef::thread::ProducerThread
#include <usbcamera.h>

Thread for capturing frames from USB cameras.

Reads raw frames from a V4L2 device, converts them to the target pixel format using SwScale, and sends them downstream as DecodedFrames.

On startup, sends a StreamFrame with video stream information. Then continuously sends DecodedFrames with pixel data.

Public Functions

USBCameraThread(std::string name, const USBCameraContext &ctx)

Construct USB camera capture thread.

Parameters:
  • name – Thread name for logging/identification

  • ctx – Camera configuration (device, resolution, format)

Throws:

std::runtime_error – if device cannot be opened

class WebSocketServerSignal : public Limef::signal::Signal
#include <websocket.h>

Single signal class for WebSocketServerThread with signal type enum

class WebSocketServerThread : public Limef::thread::Thread<Limef::frame::RawFrame>
#include <websocket.h>

Ultra-fast WebSocket server for streaming frag-MP4 video

Features:

  • Zero-copy streaming where possible

  • Lockfree hot path for frame handling

  • Automatic initialization segment caching (FTYP + MOOV)

  • Keyframe-synchronized client onboarding

  • Token-based stream access control

  • Multiple clients per stream

  • Automatic cleanup of dead connections

All incoming ftyp and moov packets per slot are cached

Complete walkthrough:

 media_thread:
  class: MediaFileThread
  note: sends CodecFrame only once at thread startup
  fork:
    fmp4:
      class: FMP4FrameFilter
      internal:
        gate:
          class: GateFrameFilter
          note: | 
            although initially closed, is always listening and caching
            CodecFrame(s).  When gate is opened, passes immediately CodecFrame downstream
        transcode:
          class: AudioTranscodeFrameFilter
          note: transcodes to AAC and sends modified CodecFrame downstream
        muxer:
          class: MuxerFrameFilter
          note: | 
            requires CodecFrame to start muxing
            for fmp4, CodecFrame is necessary so that ftyp and moov get emitted
        partitioner:
          class: MP4PartitionFrameFilter
      fork:
        websocket:
          class: WebSocketServerThread
 
Suppose gate first closed.  Once it is opened, sends CodecFrame immediately downstream,
muxer is initialized, ftyp and moov are sent correctly downstream

 *

Public Functions

bool startServer(int port = 8080)

Start WebSocket server on specified port

bool setSlotUUID(Slot slot, const std::string &uuid)

Register UUID <-> Slot mapping Called when camera/stream is added to system

bool addStreamToken(const std::string &token, const std::vector<std::string> &allowed_uuids, int64_t expires_timestamp = 0)

Add streaming token with allowed UUIDs

Parameters:
  • token – Access token string

  • allowed_uuids – List of stream UUIDs this token can access

  • expires_timestamp – Unix timestamp when token expires (0 = never)

inline bool isServerRunning() const

Get server status (lockfree)

class WebSocketStreamBuffer
#include <websocket.h>

Helper class for per-stream data structures Handles initialization segment caching and client management