Valkka  1.6.1
OpenSource Video Management
ValkkaFSManager

Writing, reading and caching frames

The level 2 API Python class ValkkaFSManager, uses several level 1 API (core) Python class objects

  • core.ValkkaFS : blocktable and book-keeping
  • core.ValkkaFSReaderThread : reads frames from the file or block device
  • core.ValkkaFSCacherThread : caches the read frames into memory (typically several blocks of frames)
  • core.ValkkaFSWriterThread : writes frames into the file or block device
  • Frames are requested on per-block basis from core.ValkkaFSReaderThread. It feeds frames to core.ValkkaFSCacherThread
  • Seek, play and stop operations take place within the cached frames in core.ValkkaFSCacherThread
  • All Threads share a common core.ValkkaFS object that has the blocktable and is also visible at the Python side

The logic of requesting certain blocks in order to show (and buffer) frames for a certain time instant is handled completely at the python side

This orchestration is handled by the level 2 API Python class ValkkaFSManager.

Let's use the following pseudocode notation, to see how objects are contained within other objects:

classname(init parameter) {
    classnames of contained objects
}

This is how it looks like. Let's hope you'll get the big picture. :)

- cpp threads run & originate python callbacks
- core.ValkkaFSWriterThread "drives" core.ValkkaFS which emits callbacks

api2.ValkkaFSManager(api2.ValkkaFS) {
    
    1: api2.ValkkaFS {
        core.ValkkaFS
        
        # c++ => python callbacks
        def new_block_cb__(propagate, par):
            - launched from cpp core.ValkkaFS.writeBlock (pycall) when a block is finished
            - propagate indicates if further callbacks should be evoked
            - par is an integer (block number) or an error string
            - calls self.block_cb for callback propagation

        # python => c++ calls
        def getBlockTable():
            - updates python side blocktable (numpy array)
            - calls cpp-side core.ValkkaFS.setArrayCall(self.blocktable_)
            - returns self.blocktable_
        }
            
    2: core.ValkkaFSReaderThread {
        core.ValkkaFS
        - writes to framefilter that is got from core.FileCacherThread.getFrameFilter() [4]
        - frames are requested on per-block basis

        # python => c++ calls
        pullBlocksPyCall(block_list) => [signal to thread]
            => pullBlocks
                - Writes frames to its outgoing framefilter 
                 (typically connected to FileCacherThread::getFrameFilter())
                - results in launching core.FileCacherThread.switchCache => pyfunc2
                  => timeLimitsCallback__
        }

    3: core.ValkkaFSWriterThread {
        core.ValkkaFS
        - input framefilter can be requested with getFrameFilter()
        }

    4: core.FileCacherThread {
        - gets frames from core.ValkkaFSReaderThread via input framefilter
        - caches frames
        - receives seek, play, stop, operations
        - send batches of frames downstream (to output filter)
        }   
    
    # c++ => python callbacks
    def timeCallback__(mstime: int):
        - originates from core.FileCacherThread.run (pyfunc)
        - once per 300 ms
        - calls:
            => self.readBlockTableIf()
                => if necessary, calls self.readBlockTable()
                    => self.blocktable = api2.ValkkaFS.getBlockTable() [1]
                        => cpp core.ValkkaFS.setArrayCall(self.blocktable_)

            => self.reqBlocks(mstimestamp)
                => core.ValkkaFSReaderThread.pullBlocksPyCall(block_list)

    def timeLimitsCallback__(tup: tuple):
        - originates from core.FileCacherThread.switchCache (pyfunc2)
        - sent when frame cache has been updated
    
    
    # some important methods:

    def setBlockCallback(cb):
        define how api2.ValkkaFS.new_block_cb__ is continued
        (by default, no callback chain)
    
    def setOutput(_id, slot, framefilter [**]):
        """Set id => slot mapping.  Set output framefilter
        """
        core.ValkkaFSReaderThread.setSlotIdCall(slot, _id)  # ID-TO-SLOT MAPPING
        ctx = core.FileStreamContext(slot, framefilter)     # SLOT-TO-FRAMEFILTER MAPPING [**]
        core.FileCacherThread.registerStreamCall(ctx)

    def setInput(_id, slot):
        core.ValkkaFSWriterThread.setSlotIdCall(_id, slot)

    def getInputFrameFilter():
        return ValkkaFSWriterThread.getFrameFilter()
    
    }

Frames are transported like this:

outgoing frames:
    
    core.ValkkaFSReaderThread [2] --> core.FileCacherThread [4] --> output framefilter [**]
    
     - Request blocks of frames        - Set seek point, play,
       to be sent downstream             stop, etc. 
     - Uses shared core.ValkkaFS      
       instance
    
    
incoming frames:

    --> core.ValkkaFSWriterThread.getFrameFilter() --> core.ValkkaFSWriterThread
                                                       
                                                       - Updates shared core.ValkkaFS
                                                         instance