Threads¶
Threads are the active components of a pipeline. The base class
Limef::thread::Thread is templated on the frame type it handles specially.
Each thread exposes Limef::thread::Thread::getInput() and
Limef::thread::Thread::getOutput() as connection points.
-
namespace thread¶
multithreading
Non-template base class for all thread classes.
Defines the common interface for all thread classes. Used by ComposeThread to manage thread chains without needing to know the specific template types.
Each thread:
Has input and output framefilters for connecting to other threads
Can be started and stopped
Can be bound to specific CPU cores
Has configurable logging
-
class ComposeThread : public Limef::thread::ThreadBase¶
- #include <compose.h>
Base class for composite threads.
Allows creating complex thread combinations by composing simpler ones. Subclasses create and chain their threads directly in the constructor call to chainThreads().
Example usage:
class StreamThread : public ComposeThread { public: StreamThread(const std::string& name, const StreamContext& ctx) : ComposeThread(name) { chainThreads({ new LiveStreamThread("live", ctx), new PacketBufferThread("buffer", ctx), new DecodingThread("decode", ctx) }); } };
ComposeThread takes ownership of the created threads and handles:
Chaining outputs to inputs
Starting/stopping in correct order
Memory cleanup
Logger propagation
CPU affinity assignment
IMPORTANT: when calling start for a ComposeThread, the threads are started in reverse order (in the current example: DecodingThread first, then PacketBufferThread, etc.) This assures that downstream receiving threads are ready & do not miss any frames sent by the upstream threads.
Subclassed by Limef::thread::MediaFileThread
Public Functions
-
virtual void start() override¶
Start the thread.
Initializes internal structures and starts the thread’s main loop. Thread subclasses may perform additional initialization.
-
virtual bool isStarted() const override¶
Check if start has been called or not.
-
virtual void requestStop() override¶
Request thread termination.
Sends termination signal to the thread. Does not wait for completion.
-
virtual void waitStop() override¶
Wait for thread termination.
Blocks until the thread has completely stopped.
-
virtual void stop() override¶
Stop the thread.
Combines requestStop() and waitStop() for convenience.
-
virtual void setAffinity(int core_id) override¶
Bind thread to specific CPU core.
- Parameters:
core_id – CPU core number to bind to
-
virtual void setLogLevel(spdlog::level::level_enum level) override¶
Set thread’s log level.
-
virtual void setLogFormat(const std::string &format) override¶
Set thread’s log format.
-
virtual void shortLogFormat() override¶
Set a nice minimal format for thread’s logger.
-
virtual spdlog::level::level_enum getLogLevel() const override¶
Get thread’s current log level.
-
virtual Limef::ff::FrameFilter &getInput() override¶
Get thread’s input framefilter.
- Returns:
Reference to input framefilter
-
virtual Limef::ff::FrameFilter &getOutput() override¶
Get thread’s output framefilter.
- Returns:
Reference to output framefilter
-
class ConsumerThread : public Limef::thread::Thread<Limef::frame::PacketFrame>¶
- #include <consumer.h>
An example consumer thread.
An example thread that consumes frames Consumes
PacketFrames at a certain framerate from the input framefilter. Does not pass anything downstream to output framefilter.
-
class ExamplePollThread : public Limef::thread::PollThread<Limef::frame::PacketFrame>¶
- #include <pollthread.h>
Concrete example of PollThread for testing.
Demonstrates PollThread with PacketFrames and a periodic task.
-
class ExampleSignal : public Limef::signal::Signal¶
- #include <examplethread.h>
An example signal for
ExampleThreadAn example custom signal class for the
ExampleThreadthread class
-
class ExampleThread : public Limef::thread::Thread<Limef::frame::PacketFrame>¶
- #include <examplethread.h>
An example thread.
An example thread with custom signals, task and frame processing. Receives
PacketFrames from upstream (input framefilter) and simply passes them downstream (output framefilter). Executes a periodic task (every 2 secs). Demonstrates intercommunication between thread front- and backend.
-
struct LiveStreamContext¶
- #include <livestreamcontext.h>
Configuration context for live stream reader.
Public Members
-
const std::string url = {""}¶
Stream URL (e.g., “rtsp://camera.example.com/stream”)
-
const Slot slot = {1}¶
Slot identifying this stream.
-
int timeout_sec = {5}¶
Read timeout in seconds.
-
std::string rtsp_transport = {"tcp"}¶
RTSP transport (tcp/udp) for RTSP sources.
-
int max_delay_ms = {500}¶
Maximum allowed network delay for RTSP sources.
-
int reopen_retry_max = {-1}¶
Maximum number of reopen attempts: -1: infinite, 0: none, etc.
-
int timeout_start_sec = {5}¶
Start value for retry timeout.
-
int timeout_max_sec = {60}¶
Max value for retry timeout.
-
int factor = {2}¶
Factor to increase retry timeouts.
-
const std::string url = {""}¶
-
class LiveStreamThread : public Limef::thread::Thread<Limef::frame::NullFrame>¶
- #include <livestream.h>
Thread for reading live media streams using FFmpeg.
Handles live media sources like RTSP cameras using FFmpeg’s interrupt callback system for proper timeout handling. Sends CodecFrame on startup followed by PacketFrames containing media data.
Example protocols:
RTSP (rtsp://)
HTTP Live Streaming (http://)
UDP/RTP streams (udp://)
Public Functions
-
LiveStreamThread(std::string name, const LiveStreamContext &stream_ctx)¶
Construct a new Live Stream Reader Thread.
- Parameters:
name – Thread name for identification
stream_ctx – LiveStreamContext defining the live stream
- Throws:
std::runtime_error – if stream cannot be opened or initialized
-
struct MediaFileContext¶
- #include <mediafile0.h>
Public Members
-
const std::string filename = {""}¶
Path to media file.
-
const Slot slot = {1}¶
Slot identifying this stream.
-
int fps = {0}¶
Target fps for feeding frames. 0 = feed frames at infinite speed. -1 = feed frames at play speed.
-
int loop = {-1}¶
Loop the file from start at EOF. -1 = no looping, otherwise, the pause in milliseconds before looping.
-
const std::string filename = {""}¶
-
class MediaFileSignal : public Limef::signal::Signal¶
- #include <mediafile0.h>
Specific signals to MediaFileThread0.
-
class MediaFileThread : public Limef::thread::ComposeThread¶
- #include <mediafile.h>
A safe media playback thread.
Creates a chain of:
MediaFileThread1 for reading files and transcoding / checking them to some standard form
OrderedPacketBufferThread for DTS ordering and buffering
NOTE: MediaFileThread1 includes conversion to AAC
NOTE: thread starting order will be (2) -> (1)
MediaFileThread1 ->| OrderedPacketBufferThreadthread border
-
class MediaFileThread0 : public Limef::thread::ProducerThread¶
- #include <mediafile0.h>
Thread for reading media files using FFmpeg.
Reads packets from a media file and sends them downstream:
On startup, sends CodecFrame with stream information
Continuously sends PacketFrames with media data
Uses ProducerThread’s timing system to control packet output rate
Dumps media downstream as-is - no internal conversions
Subclassed by Limef::thread::MediaFileThread1
Public Functions
-
MediaFileThread0(std::string name, const MediaFileContext &ctx)¶
Construct a new Media File Reader Thread.
- Parameters:
name – Thread name for identification
ctx – MediaFileContext identifying the stream
- Throws:
std::runtime_error – if file cannot be opened or stream info cannot be read
-
virtual void handleSignalFrame(const Limef::frame::SignalFrame *signalFrame) override¶
Handles custom signals as per
MediaFileSignal
-
void resendCodecFrame()¶
frontend-method: tell thread to send CodecFrame again downstream
-
class MediaFileThread1 : public Limef::thread::MediaFileThread0¶
- #include <mediafile1.h>
MediaFileThread1.
Extends MediaFileThread0 to do some encoding and codec checks in order to ensure a “canonical” / standard stream for our framework At the moment, consists of the following:
All incoming audio is encoded into AAC
TODO: video codec checks?
Public Functions
-
inline virtual Limef::ff::FrameFilter &getOutput() override¶
Get thread’s output framefilter.
- Returns:
Reference to output framefilter
-
inline virtual void setLogLevel(spdlog::level::level_enum level) override¶
Set thread’s log level.
-
inline virtual void setLogFormat(const std::string &format) override¶
Set thread’s log format.
-
class OrderedPacketBufferThread : public Limef::thread::PacketBufferThread¶
- #include <orderedpacketbuffer.h>
Thread for buffering PacketFrames with DTS ordering.
Extends PacketBufferThread to use OrderedFrameFifo, which maintains video packets in DTS order. Otherwise methods identical to PacketBufferThread.
-
class PacketBufferThread : public Limef::thread::Thread<Limef::frame::PacketFrame>¶
- #include <packetbuffer.h>
Thread for buffering PacketFrames.
This thread immediately terminates the filter chain for PacketFrames by:
Making a copy of incoming PacketFrames using framefifo’s stack
Placing the copy into framefifo’s queue
Processing frames from the queue in its own thread context
Subclassed by Limef::thread::OrderedPacketBufferThread
Public Functions
-
PacketBufferThread(std::string name, const Limef::FrameFifoContext &ctx)¶
Construct a new Packet Buffer Thread.
- Parameters:
name – Thread name for identification
ctx – FrameFifoContext defining the framefifo parameters
-
struct PerSocketData¶
- #include <websocket.h>
-
template<typename T>
class PollThread : public Limef::thread::Thread<T>¶ - #include <pollthread.h>
Example thread demonstrating PollFrameFifo usage.
This thread uses PollFrameFifo instead of the regular FrameFifo, allowing the main loop to multiplex frame reading with other I/O operations (sockets, pipes, etc.) using poll().
Key differences from regular Thread:
Overrides createFrameFifo() to create PollFrameFifo
Overrides mainLoop() to use poll() instead of blocking read()
Provides getFifoFd() for subclasses to include in their poll set
Usage pattern for subclasses (e.g., RTSPServerThread):
void MyServerThread::mainLoop() { preRun(); sendSignal__(BasicSignal(BasicSignal::Operation::started)); std::vector<pollfd> fds; fds.push_back({listen_fd_, POLLIN, 0}); fds.push_back({getFifoFd(), POLLIN, 0}); while (!stop_requested_) { int timeout_ms = computeTaskTimeout(); poll(fds.data(), fds.size(), timeout_ms); // Handle incoming frames if (fds[1].revents & POLLIN) { while (auto* frame = pollFrameFifo()->tryRead()) { frameTask(frame); } } // Handle socket I/O if (fds[0].revents & POLLIN) { acceptClient(); } task(); } postRun(); }
-
class ProducerThread : public Limef::thread::Thread<Limef::frame::NullFrame>¶
- #include <producer.h>
A prototype frame producing thread (aka source)
An example and base class for
Threads that produce frames, i.e. for frame sources. This example implementation produces and sendsNullFrames to it’s output framefilter.Subclassed by Limef::thread::MediaFileThread0, Limef::thread::USBCameraThread
-
template<typename T>
class Thread : public Limef::thread::ThreadBase¶ - #include <thread.h>
A multithreading base class.
Thread class includes:
thread “frontend” methods: you are supposed to call these from the main thread.
thread “backend” methods: these are used internally by the thread after it has been spawned.
a single, unified fifo for handling incoming
Frames and commands, theFrameFifo.
Subclassed by Limef::thread::PollThread< Limef::frame::RTPPacketFrame >, Limef::thread::PollThread< Limef::frame::PacketFrame >, Limef::thread::PollThread< T >
Public Functions
-
Thread(std::string name, const Limef::FrameFifoContext &ctx)¶
Default ctor
- Parameters:
name – a unique name identifying this thread
ctx – a
FrameFifoContextdefining the framefifo
-
virtual void start() override¶
sets up thread and starts mainLoop
-
inline virtual bool isStarted() const override¶
Check if start has been called or not.
-
virtual void requestStop() override¶
has the thread been started or not?
request thread termination
-
virtual void waitStop() override¶
wait for thread termination
-
virtual void stop() override¶
combined requestStop() and waitStop()
-
virtual void setAffinity(int core_id) override¶
Binds the thread to a certain CPU.
-
std::unique_ptr<Limef::signal::Signal> read(std::chrono::microseconds timeout = 0us)¶
read a signal at the frontend. 0 means wait forever -> uses fifo_mutex_ & fifo_condition_
Read a signal at the thread frontend (main python thread) sent by the thread backend through an internal fifo. Example:
auto signal2 = read(); if (auto* basicSignal = signal2->as<SomeSignalClass>()) { }
-
inline virtual Limef::ff::FrameFilter &getInput() override¶
returns thread’s input
FrameFilter
-
inline virtual Limef::ff::FrameFilter &getOutput() override¶
returns thread’s output
FrameFilter
-
virtual void setLogLevel(spdlog::level::level_enum level) override¶
Sets thread logger’s loglevel.
-
virtual void setLogFormat(const std::string &format) override¶
Sets thread logger’s format.
-
virtual void shortLogFormat() override¶
Shortcut method for setting a nice minimal format for thread’s logger.
-
virtual spdlog::level::level_enum getLogLevel() const override¶
Get thread logger’s current log level.
-
class ThreadBase¶
- #include <threadbase.h>
Subclassed by Limef::thread::Thread< Limef::frame::RTPPacketFrame >, Limef::thread::Thread< Limef::frame::PacketFrame >, Limef::thread::Thread< Limef::frame::NullFrame >, Limef::thread::Thread< Limef::frame::RawFrame >, Limef::thread::ComposeThread, Limef::thread::Thread< T >
Public Functions
-
virtual void start() = 0¶
Start the thread.
Initializes internal structures and starts the thread’s main loop. Thread subclasses may perform additional initialization.
-
virtual bool isStarted() const = 0¶
Check if start has been called or not.
-
virtual void requestStop() = 0¶
Request thread termination.
Sends termination signal to the thread. Does not wait for completion.
-
virtual void waitStop() = 0¶
Wait for thread termination.
Blocks until the thread has completely stopped.
-
virtual void stop() = 0¶
Stop the thread.
Combines requestStop() and waitStop() for convenience.
-
virtual void setAffinity(int core_id) = 0¶
Bind thread to specific CPU core.
- Parameters:
core_id – CPU core number to bind to
-
virtual void setLogLevel(spdlog::level::level_enum level) = 0¶
Set thread’s log level.
-
virtual void setLogFormat(const std::string &format) = 0¶
Set thread’s log format.
-
virtual void shortLogFormat() = 0¶
Set a nice minimal format for thread’s logger.
-
virtual spdlog::level::level_enum getLogLevel() const = 0¶
Get thread’s current log level.
-
virtual Limef::ff::FrameFilter &getInput() = 0¶
Get thread’s input framefilter.
- Returns:
Reference to input framefilter
-
virtual Limef::ff::FrameFilter &getOutput() = 0¶
Get thread’s output framefilter.
- Returns:
Reference to output framefilter
-
virtual void start() = 0¶
-
struct USBCameraContext¶
- #include <usbcamera.h>
Configuration for USB camera capture.
Public Members
-
std::string device = {"/dev/video0"}¶
V4L2 device path.
-
Slot slot = {1}¶
Slot identifying this stream.
-
int width = {640}¶
Requested capture width.
-
int height = {480}¶
Requested capture height.
-
int fps = {30}¶
Requested frame rate.
-
AVPixelFormat output_format = {AV_PIX_FMT_NV12}¶
Output format after SwScale (NV12 for GPU)
-
std::string device = {"/dev/video0"}¶
-
class USBCameraThread : public Limef::thread::ProducerThread¶
- #include <usbcamera.h>
Thread for capturing frames from USB cameras.
Reads raw frames from a V4L2 device, converts them to the target pixel format using SwScale, and sends them downstream as DecodedFrames.
On startup, sends a StreamFrame with video stream information. Then continuously sends DecodedFrames with pixel data.
Public Functions
-
USBCameraThread(std::string name, const USBCameraContext &ctx)¶
Construct USB camera capture thread.
- Parameters:
name – Thread name for logging/identification
ctx – Camera configuration (device, resolution, format)
- Throws:
std::runtime_error – if device cannot be opened
-
USBCameraThread(std::string name, const USBCameraContext &ctx)¶
-
class WebSocketServerSignal : public Limef::signal::Signal¶
- #include <websocket.h>
Single signal class for WebSocketServerThread with signal type enum
-
class WebSocketServerThread : public Limef::thread::Thread<Limef::frame::RawFrame>¶
- #include <websocket.h>
Ultra-fast WebSocket server for streaming frag-MP4 video
Features:
Zero-copy streaming where possible
Lockfree hot path for frame handling
Automatic initialization segment caching (FTYP + MOOV)
Keyframe-synchronized client onboarding
Token-based stream access control
Multiple clients per stream
Automatic cleanup of dead connections
All incoming ftyp and moov packets per slot are cached
Complete walkthrough:
media_thread: class: MediaFileThread note: sends CodecFrame only once at thread startup fork: fmp4: class: FMP4FrameFilter internal: gate: class: GateFrameFilter note: | although initially closed, is always listening and caching CodecFrame(s). When gate is opened, passes immediately CodecFrame downstream transcode: class: AudioTranscodeFrameFilter note: transcodes to AAC and sends modified CodecFrame downstream muxer: class: MuxerFrameFilter note: | requires CodecFrame to start muxing for fmp4, CodecFrame is necessary so that ftyp and moov get emitted partitioner: class: MP4PartitionFrameFilter fork: websocket: class: WebSocketServerThread Suppose gate first closed. Once it is opened, sends CodecFrame immediately downstream, muxer is initialized, ftyp and moov are sent correctly downstream *
Public Functions
-
bool startServer(int port = 8080)¶
Start WebSocket server on specified port
-
bool setSlotUUID(Slot slot, const std::string &uuid)¶
Register UUID <-> Slot mapping Called when camera/stream is added to system
-
bool addStreamToken(const std::string &token, const std::vector<std::string> &allowed_uuids, int64_t expires_timestamp = 0)¶
Add streaming token with allowed UUIDs
- Parameters:
token – Access token string
allowed_uuids – List of stream UUIDs this token can access
expires_timestamp – Unix timestamp when token expires (0 = never)
-
inline bool isServerRunning() const¶
Get server status (lockfree)
-
class WebSocketStreamBuffer¶
- #include <websocket.h>
Helper class for per-stream data structures Handles initialization segment caching and client management