Valkka  1.6.1
OpenSource Video Management
livethread.h
Go to the documentation of this file.
1 #ifndef LIVETHREAD_HEADER_GUARD
2 #define LIVETHREAD_HEADER_GUARD
3 /*
4  * livethread.h : A live555 thread
5  *
6  * (c) Copyright 2017-2024 Sampsa Riikonen
7  *
8  * Authors: Sampsa Riikonen <sampsa.riikonen@iki.fi>
9  *
10  * This file is part of the Valkka library.
11  *
12  * Valkka is free software: you can redistribute it and/or modify
13  * it under the terms of the GNU Lesser General Public License as
14  * published by the Free Software Foundation, either version 3 of the
15  * License, or (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU Lesser General Public License for more details.
21  *
22  * You should have received a copy of the GNU Lesser General Public License
23  * along with this program. If not, see <https://www.gnu.org/licenses/>
24  *
25  */
26 
37 #include "live.h"
38 #include "liveserver.h"
39 #include "thread.h"
40 #include "framefilter.h"
41 #include "framefifo.h"
42 #include "event.h"
43 
44 
45 void setLiveOutPacketBuffermaxSize(unsigned i); // <pyapi>
46 
47 
57 class LiveFifo : public FrameFifo {
58 
59 public:
61  LiveFifo(const char* name, FrameFifoContext ctx);
63  ~LiveFifo();
64 
65 protected:
66  void* live_thread;
67 
68 public:
69  void setLiveThread(void* live_thread);
70  bool writeCopy(Frame* f, bool wait=false);
71 };
72 
73 
80 enum class LiveConnectionType { // <pyapi>
81  none, // <pyapi>
82  rtsp, // <pyapi>
83  sdp // <pyapi>
84 }; // <pyapi>
85 
86 
97 struct LiveConnectionContext { // <pyapi>
99  LiveConnectionContext(LiveConnectionType ct, std::string address, SlotNumber slot, // <pyapi>
100  FrameFilter* framefilter) : // <pyapi>
102  request_multicast(false), request_tcp(false), recv_buffer_size(0), reordering_time(0), // <pyapi>
103  time_correction(TimeCorrectionType::smart) // <pyapi>
104  {} // <pyapi>
106  LiveConnectionContext() : // <pyapi>
107  connection_type(LiveConnectionType::none), address(""), slot(0), framefilter(NULL), msreconnect(10000), // <pyapi>
108  request_multicast(false), request_tcp(false),time_correction(TimeCorrectionType::smart) // <pyapi>
109  {} // <pyapi>
111  std::string address;
112  SlotNumber slot;
114  long unsigned int msreconnect;
116  bool request_tcp;
117  unsigned recv_buffer_size;
118  unsigned reordering_time;
120 }; // <pyapi>
121 
122 
127 struct LiveOutboundContext { // <pyapi>
128  LiveOutboundContext(LiveConnectionType ct, std::string address, SlotNumber slot, // <pyapi>
129  unsigned short int portnum) : // <pyapi>
130  connection_type(ct), address(address), slot(slot), portnum(portnum), ttl(225) // <pyapi>
131  {} // <pyapi>
132  LiveOutboundContext() : // <pyapi>
133  connection_type(LiveConnectionType::none), address(""), slot(0), portnum(0), ttl(255) // <pyapi>
134  {} // <pyapi>
136  std::string address;
137  SlotNumber slot;
138  unsigned short int portnum;
139  unsigned char ttl;
140 }; // <pyapi>
141 
142 
143 
149 enum class LiveSignal {
150  none,
151  exit,
152  // inbound streams
153  register_stream,
154  deregister_stream,
155  play_stream,
156  stop_stream,
157  // outbound streams
158  register_outbound,
159  deregister_outbound
160 };
161 
162 
167  LiveSignal signal;
168  LiveConnectionContext *connection_context;
169  LiveOutboundContext *outbound_context;
170 };
171 
172 
173 
185 class Connection {
186 
187 public:
195  Connection(UsageEnvironment& env, LiveConnectionContext& ctx);
196  virtual ~Connection();
197 
198 protected:
200 
201  // internal framefilter chain.. if we'd like to modify the frames before they are passed to the API user
202  // more framefilters could be generated here and initialized in the constructor init list
203  // the starting filter should always be named as "inputfilter" .. this is where Live555 writes the frames
204  // TimestampFrameFilter2 timestampfilter; ///< Internal framefilter: correct timestamp
205  // SlotFrameFilter inputfilter; ///< Internal framefilter: set slot number
206 
207  FrameFilter* timestampfilter;
208  FrameFilter* inputfilter;
210 
211  long int frametimer;
212  long int pendingtimer;
213 
214 public:
215  UsageEnvironment &env;
216  bool is_playing;
217 
218 public:
219  virtual void playStream() =0;
220  virtual void stopStream(bool cut = true) =0;
221  virtual void reStartStream();
222  virtual void reStartStreamIf();
223  virtual bool isClosed();
224  virtual void forceClose();
225  SlotNumber getSlot();
226 };
227 
228 
237 class Outbound { // will leave this quite generic .. don't know at this point how the rtsp server is going to be // analogy: AVThread
238 
239 public:
240  Outbound(UsageEnvironment& env, FrameFifo& fifo, LiveOutboundContext& ctx);
241  virtual ~Outbound();
242 
243 public: // init'd at constructor time
245  UsageEnvironment &env;
247 
248 protected:
249  bool setup_ok, at_setup;
250 
251 public:
252  virtual void reinit();
253  virtual void handleFrame(Frame *f);
254 };
255 
256 
263 class RTSPConnection : public Connection {
264 
265 public:
267  RTSPConnection(UsageEnvironment& env, LiveConnectionContext& ctx);
268  ~RTSPConnection();
269  // RTSPConnection(const RTSPConnection& cp); ///< Copy constructor .. nopes, default copy constructor good enough
270 
271 
272 private:
275  bool termplease;
276 
277 public:
278  void playStream();
279  void stopStream(bool cut = true);
280  void reStartStreamIf();
281  bool isClosed();
282  void forceClose();
283 };
284 
285 
290 class SDPConnection : public Connection {
291 
292 public:
294  SDPConnection(UsageEnvironment& env, LiveConnectionContext& ctx);
296  ~SDPConnection();
297 
298 private:
299  StreamClientState *scs;
300  bool passthrough;
301 
302 public:
303  void playStream();
304  void stopStream(bool cut = true);
305 
306 };
307 
308 
317 class SDPOutbound : public Outbound {
318 
319 public:
320  SDPOutbound(UsageEnvironment &env, FrameFifo &fifo, LiveOutboundContext& ctx);
321  ~SDPOutbound();
322 
323 public: // virtual redefined
324  void reinit();
325  void handleFrame(Frame *f);
326 
327 public:
328  std::vector<Stream*> streams;
329 };
330 
331 
340 class RTSPOutbound : public Outbound {
341 
342 public:
343  RTSPOutbound(UsageEnvironment &env, RTSPServer &server, FrameFifo &fifo, LiveOutboundContext& ctx);
344  ~RTSPOutbound();
345 
346 protected: // init'd at constructor time
347  RTSPServer &server;
348 
349 public: // virtual redefined
350  void reinit();
351  void handleFrame(Frame *f);
352 
353 public:
354  ServerMediaSession *media_session;
355  std::vector<ValkkaServerMediaSubsession*> media_subsessions;
356 
357 };
358 
359 
360 
372 class LiveThread : public Thread { // <pyapi>
373 
374 public:
375  static void periodicTask(void* cdata);
376 
377 public: // <pyapi>
384  LiveThread(const char* name, FrameFifoContext fifo_ctx=FrameFifoContext()); // <pyapi>
385  ~LiveThread(); // <pyapi>
386 
387 protected: // frame input
390 
391 protected: // redefinitions
392  std::deque<LiveSignalContext> signal_fifo;
393 
394 protected:
395  TaskScheduler* scheduler;
396  UsageEnvironment* env;
398  std::vector<Connection*> slots_;
399  std::vector<Outbound*> out_slots_;
400  std::list<Connection*> pending;
402  EventTriggerId event_trigger_id_hello_world;
403  EventTriggerId event_trigger_id_frame_arrived;
404  EventTriggerId event_trigger_id_got_frames;
405  int fc;
406 
407 protected: // rtsp server for live and/or recorded stream
408  UserAuthenticationDatabase *authDB;
409  RTSPServer *server;
410 
411 public: // redefined virtual functions
412  void run();
413  void preRun();
414  void postRun();
416  void sendSignal(LiveSignalContext signal_ctx);
417 
418 protected:
419  void handlePending();
420  void checkAlive();
421  void closePending();
422  void handleSignals();
423  void handleFrame(Frame* f);
424 
425 private: // internal
426  int safeGetSlot (SlotNumber slot, Connection*& con);
427  int safeGetOutboundSlot (SlotNumber slot, Outbound*& outbound);
428  // inbound streams
429  void registerStream (LiveConnectionContext &connection_ctx);
430  void deregisterStream (LiveConnectionContext &connection_ctx);
431  void playStream (LiveConnectionContext &connection_ctx);
432  // outbound streams
433  void registerOutbound (LiveOutboundContext &outbound_ctx);
434  void deregisterOutbound (LiveOutboundContext &outbound_ctx);
435  // thread control
436  void stopStream (LiveConnectionContext &connection_ctx);
437 
438 public: // *** C & Python API *** .. these routines go through the condvar/mutex locking // <pyapi>
439  // inbound streams
440  void registerStreamCall (LiveConnectionContext &connection_ctx);
441  void deregisterStreamCall (LiveConnectionContext &connection_ctx);
442  void playStreamCall (LiveConnectionContext &connection_ctx);
443  void stopStreamCall (LiveConnectionContext &connection_ctx);
444  // outbound streams
445  void registerOutboundCall (LiveOutboundContext &outbound_ctx);
446  void deregisterOutboundCall (LiveOutboundContext &outbound_ctx);
447  // thread control
448  void requestStopCall();
449  // LiveFifo &getFifo(); ///< API method: get fifo for sending frames with live555 // <pyapi>
451  void setRTSPServer(int portnum=8554);
452  virtual void waitReady();
453 
454 public: // live555 events and tasks
455  static void helloWorldEvent(void* clientData);
456  static void frameArrivedEvent(void* clientData);
457  static void gotFramesEvent(void* clientData);
458  static void readFrameFifoTask(void* clientData);
459 
460 public:
461  void testTrigger();
462  void triggerGotFrames();
463 }; // <pyapi>
464 
465 #endif
A base class that unifies all kinds of connections (RTSP and SDP).
Definition: livethread.h:185
virtual void reStartStream()
Called from within the live555 event loop.
Definition: livethread.cpp:151
virtual bool isClosed()
Have the streams resources been reclaimed after stopping it?
Definition: livethread.cpp:163
virtual ~Connection()
Default destructor.
Definition: livethread.cpp:145
LiveConnectionContext & ctx
LiveConnectionContext identifying the stream source (address), it's destination (slot and target fram...
Definition: livethread.h:199
virtual void stopStream(bool cut=true)=0
Stops stream and reclaims it resources. Called from within the live555 event loop....
virtual void forceClose()
Normally, stopStream reclaims the resources. This one forces the delete.
Definition: livethread.cpp:167
virtual void reStartStreamIf()
Called from within the live555 event loop.
Definition: livethread.cpp:156
virtual void playStream()=0
Called from within the live555 event loop.
long int pendingtimer
Measures how long stream has been pending.
Definition: livethread.h:212
UsageEnvironment & env
UsageEnvironment identifying the Live555 event loop (see Live555 primer)
Definition: livethread.h:215
SlotNumber getSlot()
Return the slot number.
Definition: livethread.cpp:159
long int frametimer
Measures time when the last frame was received.
Definition: livethread.h:211
FrameFilter * repeat_sps_filter
Repeat sps & pps packets before i-frame (if they were not there before the i-frame)
Definition: livethread.h:209
Passes frames to a FrameFifo.
Definition: framefilter.h:530
A thread-safe combination of a fifo (first-in-first-out) queue and an associated stack.
Definition: framefifo.h:83
FrameFifoContext ctx
Parameters defining the stack and overflow behaviour.
Definition: framefifo.h:93
The mother class of all frame filters! FrameFilters are used to create "filter chains".
Definition: framefilter.h:44
Frame: An abstract queueable class.
Definition: frame.h:112
This is a special FrameFifo class for feeding frames into live555, i.e.
Definition: livethread.h:57
LiveFifo(const char *name, FrameFifoContext ctx)
Default constructor.
Definition: livethread.cpp:50
~LiveFifo()
Default virtual destructor.
Definition: livethread.cpp:53
bool writeCopy(Frame *f, bool wait=false)
Take a frame "ftmp" from the stack, copy contents of "f" into "ftmp" and insert "ftmp" into the begin...
Definition: livethread.cpp:62
Live555, running in a separate thread.
Definition: livethread.h:372
static void helloWorldEvent(void *clientData)
For testing/debugging
Definition: livethread.cpp:1377
FifoFrameFilter infilter
A FrameFilter for writing incoming frames.
Definition: livethread.h:389
void closePending()
Force close all pending connections.
Definition: livethread.cpp:896
static void readFrameFifoTask(void *clientData)
This task registers itself if there are frames in the fifo.
Definition: livethread.cpp:1405
int fc
debugging: incoming frame counter
Definition: livethread.h:405
void handleSignals()
Handle pending signals in the signals queue. Used by LiveThread::periodicTask.
Definition: livethread.cpp:906
void checkAlive()
Used by LiveThread::periodicTask.
Definition: livethread.cpp:862
char eventLoopWatchVariable
Modifying this, kills the Live555 event loop.
Definition: livethread.h:397
static void frameArrivedEvent(void *clientData)
For debugging.
Definition: livethread.cpp:1383
void deregisterOutboundCall(LiveOutboundContext &outbound_ctx)
API method: deregister outbound stream // <pyapi>
Definition: livethread.cpp:1284
static void gotFramesEvent(void *clientData)
Triggered when an empty fifo gets a frame. Schedules readFrameFifoTask. See Live streaming.
Definition: livethread.cpp:1396
void preRun()
Called before entering the main execution loop, but after creating the thread.
Definition: livethread.cpp:849
void triggerGotFrames()
See Live streaming.
Definition: livethread.cpp:1442
LiveFifo infifo
A FrameFifo for incoming frames.
Definition: livethread.h:388
std::vector< Connection * > slots_
A constant sized vector. Book-keeping of the connections (RTSP or SDP) currently active in the live55...
Definition: livethread.h:398
std::vector< Outbound * > out_slots_
Book-keeping for the outbound connections.
Definition: livethread.h:399
UsageEnvironment * env
Live555 UsageEnvironment identifying the event loop.
Definition: livethread.h:396
TaskScheduler * scheduler
Live555 event loop TaskScheduler.
Definition: livethread.h:395
std::deque< LiveSignalContext > signal_fifo
Redefinition of signal fifo (Thread::signal_fifo becomes hidden)
Definition: livethread.h:392
void handleFrame(Frame *f)
Handle incoming frames. See Live streaming.
Definition: livethread.cpp:966
static void periodicTask(void *cdata)
Used to (re)schedule LiveThread methods into the live555 event loop.
Definition: livethread.cpp:1227
void registerOutboundCall(LiveOutboundContext &outbound_ctx)
API method: register outbound stream // <pyapi>
Definition: livethread.cpp:1278
void registerStreamCall(LiveConnectionContext &connection_ctx)
API method: registers a stream // <pyapi>
Definition: livethread.cpp:1256
bool exit_requested
Exit asap.
Definition: livethread.h:401
void setRTSPServer(int portnum=8554)
API method: activate the RTSP server at port portnum // <pyapi>
Definition: livethread.cpp:1326
void run()
Main execution loop is defined here.
Definition: livethread.cpp:987
FifoFrameFilter & getFrameFilter()
API method: get filter for sending frames with live555 // <pyapi>
Definition: livethread.cpp:1321
virtual void waitReady()
API method: wait until all signals and pending connections are resolved // <pyapi>
Definition: livethread.cpp:1304
void handlePending()
Try to close streams that were not properly closed (i.e. idling for the tcp socket while closing)....
Definition: livethread.cpp:876
void stopStreamCall(LiveConnectionContext &connection_ctx)
API method: stops playing the stream and feeding frames // <pyapi>
Definition: livethread.cpp:1272
void testTrigger()
See Live streaming.
Definition: livethread.cpp:1436
void deregisterStreamCall(LiveConnectionContext &connection_ctx)
API method: de-registers a stream // <pyapi>
Definition: livethread.cpp:1261
void postRun()
Called after the main execution loop exits, but before joining the thread.
Definition: livethread.cpp:853
void sendSignal(LiveSignalContext signal_ctx)
Send a signal to the thread.
Definition: livethread.cpp:856
void playStreamCall(LiveConnectionContext &connection_ctx)
API method: starts playing the stream and feeding frames // <pyapi>
Definition: livethread.cpp:1267
void requestStopCall()
API method: Like Thread::stopCall() but does not block // <pyapi>
Definition: livethread.cpp:1291
LiveThread(const char *name, FrameFifoContext fifo_ctx=FrameFifoContext())
Default constructor.
Definition: livethread.cpp:790
std::list< Connection * > pending
Incoming connections pending for closing.
Definition: livethread.h:400
A base class that unifies all kinds of outgoing streams (i.e.
Definition: livethread.h:237
Outbound(UsageEnvironment &env, FrameFifo &fifo, LiveOutboundContext &ctx)
Default constructor.
Definition: livethread.cpp:174
LiveOutboundContext & ctx
Identifies the connection type, stream address, etc. See LiveOutboundContext.
Definition: livethread.h:244
virtual void reinit()
Reset session and subsessions.
Definition: livethread.cpp:177
bool at_setup
Flags used by Outbound::handleFrame.
Definition: livethread.h:249
virtual ~Outbound()
Default virtual destructor.
Definition: livethread.cpp:175
UsageEnvironment & env
Identifies the live555 event loop.
Definition: livethread.h:245
virtual void handleFrame(Frame *f)
Setup session and subsessions, writes payload.
Definition: livethread.cpp:183
FrameFifo & fifo
Outgoing fFrames are being read and finally recycled here.
Definition: livethread.h:246
A negotiated RTSP connection.
Definition: livethread.h:263
void forceClose()
Normally, stopStream reclaims the resources. This one forces the delete.
Definition: livethread.cpp:421
void reStartStreamIf()
Restarts the stream if no frames have been received for a while.
Definition: livethread.cpp:329
RTSPConnection(UsageEnvironment &env, LiveConnectionContext &ctx)
Default constructor.
Definition: livethread.cpp:258
void playStream()
Uses ValkkaRTSPClient instance to initiate the RTSP negotiation.
Definition: livethread.cpp:271
LiveStatus livestatus
Reference of this variable is passed to ValkkaRTSPClient. We can see outside of the live555 callback ...
Definition: livethread.h:274
ValkkaRTSPClient * client
ValkkaRTSPClient defines the behaviour (i.e. event registration and callbacks) of the RTSP client (se...
Definition: livethread.h:273
bool termplease
Ref of this var is passed to ValkkaRTSPClient. When set to true, ValkkaRTSPClient should terminate it...
Definition: livethread.h:275
void stopStream(bool cut=true)
Uses ValkkaRTSPClient instance to shut down the stream.
Definition: livethread.cpp:292
bool isClosed()
Have the streams resources been reclaimed?
Definition: livethread.cpp:417
Sending a stream using the on-demand rtsp server.
Definition: livethread.h:340
RTSPServer & server
Reference to the RTSPServer instance.
Definition: livethread.h:347
void handleFrame(Frame *f)
Setup session and subsessions, writes payload.
Definition: livethread.cpp:676
void reinit()
Reset session and subsessions.
Definition: livethread.cpp:643
Connection is is defined in an SDP file.
Definition: livethread.h:290
void playStream()
Creates Live555 MediaSessions, MediaSinks, etc. instances and registers them directly to the Live555 ...
Definition: livethread.cpp:434
SDPConnection(UsageEnvironment &env, LiveConnectionContext &ctx)
Default constructor.
Definition: livethread.cpp:428
~SDPConnection()
Default destructor.
Definition: livethread.cpp:431
void stopStream(bool cut=true)
Closes Live555 MediaSessions, MediaSinks, etc.
Definition: livethread.cpp:530
Sending a stream without rtsp negotiation (i.e.
Definition: livethread.h:317
std::vector< Stream * > streams
SubStreams of the outgoing streams (typically two, e.g. video and sound)
Definition: livethread.h:328
void handleFrame(Frame *f)
Setup session and subsessions, writes payload.
Definition: livethread.cpp:563
void reinit()
Reset session and subsessions.
Definition: livethread.cpp:551
Class to hold per-stream state that we maintain throughout each stream's lifetime.
Definition: live.h:78
A class for multithreading with a signaling system.
Definition: thread.h:87
std::string name
Name of the thread.
Definition: thread.h:116
Handles a live555 RTSP connection.
Definition: live.h:111
TimeCorrectionType
Methods to correct frame timestamps.
Definition: frame.h:87
Thread safe system of fifo and a stack.
Definition of FrameFilter and derived classes for various purposes.
LiveStatus
Status for the ValkkaRTSPClient.
Definition: live.h:62
LiveConnectionType
LiveThread connection types.
Definition: livethread.h:80
Connection(UsageEnvironment &env, LiveConnectionContext &ctx)
Default constructor.
Definition: livethread.cpp:123
Interface to live555.
Live555 interface for server side: streaming to udp sockets directly or by using an on-demand rtsp se...
LiveSignal
Characteristic signals for the Live555 thread.
Definition: livethread.h:149
Describes the stack structure and fifo behaviour for a FrameFifo.
Definition: framefifo.h:56
Identifies a stream and encapsulates information about the type of connection the user is requesting ...
Definition: livethread.h:97
TimeCorrectionType time_correction
How to perform frame timestamp correction // <pyapi>
Definition: livethread.h:119
LiveConnectionContext(LiveConnectionType ct, std::string address, SlotNumber slot, FrameFilter *framefilter)
Default constructor.
Definition: livethread.h:99
std::string address
Stream address // <pyapi>
Definition: livethread.h:111
SlotNumber slot
A unique stream slot that identifies this stream // <pyapi>
Definition: livethread.h:112
long unsigned int msreconnect
If stream has delivered nothing during this many milliseconds, reconnect // <pyapi>
Definition: livethread.h:114
unsigned reordering_time
Live555 packet reordering treshold time (microsecs) // <pyapi>
Definition: livethread.h:118
LiveConnectionType connection_type
Identifies the connection type // <pyapi>
Definition: livethread.h:110
bool request_multicast
Request multicast in the rtsp negotiation or not // <pyapi>
Definition: livethread.h:115
bool request_tcp
Request interleaved rtsp streaming or not // <pyapi>
Definition: livethread.h:116
FrameFilter * framefilter
The frames are feeded into this FrameFilter // <pyapi>
Definition: livethread.h:113
unsigned recv_buffer_size
Operating system ringbuffer size for incoming socket // <pyapi>
Definition: livethread.h:117
LiveConnectionContext()
Dummy constructor : remember to set member values by hand.
Definition: livethread.h:106
Same as LiveConnectionContext, but for outbound streams (i.e.
Definition: livethread.h:127
SlotNumber slot
A unique stream slot that identifies this stream // <pyapi>
Definition: livethread.h:137
unsigned short int portnum
Start port number (for sdp) // <pyapi>
Definition: livethread.h:138
std::string address
Stream address // <pyapi>
Definition: livethread.h:136
unsigned char ttl
Packet time-to-live // <pyapi>
Definition: livethread.h:139
LiveConnectionType connection_type
Identifies the connection type // <pyapi>
Definition: livethread.h:135
Identifies the information the signals LiveThread::Signals carry.
Definition: livethread.h:166
Base class for multithreading.
@ none
undefined (initial value)
Definition: usbthread.h:143