Source code for valkka.multiprocess.sync

import traceback
from multiprocessing import Event
# from valkka import core # nopes

class NotEnoughEvents(BaseException):
    pass

[docs] class EventGroup: """Creates a group of multiprocessing events :param n: number of events to be instantiated and cached :param event_class: a multiprocessing event class that has ``set`` and ``clear`` methods. default: python ``multiprocessing.Event``. Can also be ``EventFd`` from libValkka. """ def __init__(self, n = 10, event_class = Event): self.events = [] # list of cached events: immutable self.index = [] # list of indexes of available events: mutable for i in range(n): self.events.append(event_class()) self.index.append(i) def __str__(self): st = "<EventGroup: " for i in range(len(self.events)): if i in self.index: st += "f"+str(i)+" " else: st += "R"+str(i)+" " st += ">" return st def __len__(self): return len(self.events)
[docs] def set(self, i): """Set / trigger an event at index i. Used typically at multiprocessing backend. """ self.events[i].set()
[docs] def reserve(self) -> tuple: """Reserve and return an Event instance together with its index: ``index, Event`` Use typically at process frontend / python main process """ try: index = self.index.pop(0) except IndexError as e: raise NotEnoughEvents event = self.events[index] # event.clear() # clear event before using it """..woops: simply calling clear before calling set on an EventFd will crash the program with "Resource temporarily unavailable" """ event.set() event.clear() return index, event
[docs] def release(self, event): """Release an EventFd sync primitive. Use typically at process frontend / python main process :param event: event to be released / returned """ try: index = self.events.index(event) except ValueError: # trying to return an event that's not in this EventGroup raise ValueError("event not in this Eventgroup") self.index.append(index)
[docs] def release_ind(self, index: int): """Release an EventFd sync primitive, based on the index. Use typically at process frontend / python main process :param index: event's index """ self.index.append(index)
[docs] def fromIndex(self, i): """Get an event, based on the event index. Use typically at multiprocessing backend to get the corresponding event as in the frontend. """ return self.events[i]
[docs] def asIndex(self, event): """Return index corresponding to an event """ return self.events.index(event)
[docs] class SyncIndex: """A context manager for synchronizing between multiprocessing front- and backend. :param event_group: an EventGroup instance Wait's and releases an event at context manager exit """ def __init__(self, event_group: EventGroup): self.eg = event_group self.event = None def __enter__(self): i, self.event = self.eg.reserve() return i def __exit__(self, type, value, tb): if tb: print("SyncIndex failed with:") traceback.print_tb(tb) self.event.wait() # wait until the event has been set self.eg.release(self.event) # recycle the event
def main1(): # raise(NotEnoughEvents) # eg = EventGroup(0) eg = EventGroup(1) with SyncIndex(eg) as i: print("waiting ", i) print(eg) # kokkelis def main2(): # g = EventFdGroup(10) g = EventFdGroup(1) print(g) ind1, e1 = g.reserve() print(">>",ind1, e1) print(g) ind2, e2 = g.reserve() print(">>",ind2, e2) print(g) print("->",g.fromIndex(ind2)) g.release(e1) print(g) if __name__ == "__main__": # main1() main2()