Programming, Photography, Guides, Thoughts

Waiting on multiple Event objects in Python 3


Pas­si­ve wai­ting is a way of suspen­ding pro­grams execu­ti­on without con­su­ming any CPU (in con­trast with acti­ve wai­ting whe­re a pro­gram is busy doing nothing). In Python, you can use cer­ta­in clas­ses from the threa­ding modu­le (such as Con­di­ti­on and Event). Let’s take for exam­ple the Event class. It is good for wai­ting for an event. But what if you need to wait for many events? You might say „Why not to make a list of mul­tiple Event objects and wait for all of them?” Well… that is not going to work:

import threading, collections

events=[threading.Event() for _ in range(10)]  # make 10 events
collections.deque((e.clear() for e in events), 0)  # clear all event flags
activated={e for e in events if e.wait(timeout)}  # wait for events, get a set of activated ones

The­re are two pro­blems with this kind of code:

  • If you use any time­out value, you degra­de it to acti­ve wait (and you will also wait until the last event in the list gets acti­va­ted or times out).
  • If you don’t use a time­out value, the code gets stuck on the first event (and then the second, thi­rd, etc.).

If you used to pro­gram in C/C++, you might want to use a select functi­on. Select allows you to wait for seve­ral I/O ope­rati­ons and returns when any of them ends. The good news is the­re is such a functi­on in Python 3. Con­cre­te­ly, it is the select method of the selectors.BaseSelector class. The bad news is you can wait only on file objects.

I don’t know how about you but I’d like to wait on other things, such as MULTIPLE EVENT OBJECTS! But you can’t, because tho­se are not file objects. Now you might be saying

Do you wish the­re was a way to wait on tho­se Event objects? The­re is :-). Here’s a sli­ght­ly modi­fied code, so it works in Python 3 (the ori­gi­nal code is here):

class WaitableEvent:
    """
    Provides an abstract object that can be used to resume select loops with
    indefinite waits from another thread or process. This mimics the standard
    threading.Event interface.
    """

    def __init__(self):
        self._read_fd, self._write_fd = os.pipe()

    def wait(self, timeout=None):
        rfds, wfds, efds = select.select([self._read_fd], [], [], timeout)
        return self._read_fd in rfds

    def isSet(self):
        return self.wait(0)

    def clear(self):
        if self.isSet():
            os.read(self._read_fd, 1)

    def set(self):
        if not self.isSet():
            os.write(self._write_fd, b'1')

    def fileno(self):
        """
        Return the FD number of the read side of the pipe, allows this object to
        be used with select.select().
        """
        return self._read_fd

    def __del__(self):
        os.close(self._read_fd)
        os.close(self._write_fd)

To wait for mul­tiple event objects using the select method is sim­ple as this:

import selectors

sel = selectors.DefaultSelector()  # create selector
event1 = WaitableEvent()  # create event 1
event2 = WaitableEvent()  # create event 2
sel.register(event1, selectors.EVENT_READ, "event 1")
sel.register(event2, selectors.EVENT_READ, "event 2")

# wait for an event or until the 5s timeout expires
events = sel.select(timeout=5)

if not events:
    print("Timeout after 5s")
else:
    for key, mask in events:
        print(key.data)

Any sug­ges­ti­ons or impro­ve­ments? Lea­ve a com­ment below :-).


Comments

6 responses to “Waiting on multiple Event objects in Python 3”

  1. Thank you for your article
    how can i use in my code ?

    import os
    import time
    import datetime
    import logging

    from watchdog.observers.polling import Pol­lin­gOb­ser­ver as Observer
    from watchdog.events import LoggingEventHandler
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler

    import mysql.connector
    mydb = mysql.connector.connect(
    host=„localhost”,
    user=„root”,
    passwd=””,
    database=„html”
    )
    mycur­sor = mydb.cursor()
    class Watcher:
    list_path = ‘.\\a’
    def __init__(self):
    self.observer = Observer()

    def run(self):
    event_handler = Handler()
    # for i in range(len(self.list_path)):
    self.observer.schedule(event_handler, self.list_path, recursive=True)
    self.observer.start()
    try:
    whi­le True:
    time.sleep(6000)
    except:
    self.observer.stop()
    print („The app is stoped!,Please run again”)
    self.observer.join()
    class Handler(FileSystemEventHandler):
    @staticmethod
    def on_any_event(event):
    if event.is_directory:
    return None

    elif event.event_type==‘created’:
    # print(event.event_type)
    # Take any acti­on here when a file is first created.
    #print („Rece­i­ved cre­a­ted event – %s.” % event.src_path)
    # print (event.src_path)
    i = event.src_path
    #### add database ######
    file_path = i.replace(‘\n’,”).replace(‘\\’,’/’)

    str­Path = os.path.realpath(str(file_path))
    nmFol­ders = strPath.split( os.path.sep )
    length=len(nmFolders)

    # if nmFolders[-1].endswith(‘.html’):
    # print(„Html file is gene­ra­ted! Gene­ra­ted time:”,datetime.datetime.now(),” ||| Filename:”,nmFolders[-1])
    print(nmFolders[-1])
    mycursor.execute(„CREATE TABLE IF NOT EXISTS „+ nmFolders[length‑2] +” (id INT AUTO_INCREMENT PRIMARY KEY, file_name VARCHAR(255), file_root VARCHAR(255))ENGINE=INNODB”)
    sql=””„INSERT INTO rss­fol­der (fol­der­na­me)
    SELECT * FROM (SELECT ’ ”””+nmFolders[length‑2]+”„„ ‘ AS fol­der­na­me) AS tmp
    WHERE NOT EXISTS (
    SELECT fol­der­na­me FROM rss­fol­der whe­re fol­der­na­me=’ ”””+nmFolders[length‑2]+”„„ ‘
    ) LIMIT 1;”””
    mycursor.execute(sql)

    sql = „INSERT INTO „+ nmFolders[length‑2] +” (file_name, file_root) VALUES (%s, %s)”

    val = (str(nmFolders[-1]), str(file_path))
    mycursor.execute(sql, val)
    mydb.commit()

    # elif event.event_type == ‘modi­fied’:
    # # Taken any acti­on here when a file is modified.
    # print („Rece­i­ved modi­fied event – %s.” % event.src_path)

    w = Watcher()
    w.run()

  2. Patrick Avatar
    Patrick

    Very nice!
    Just a pre­ci­si­on: on Python 3.7, select() return a nametuple, so you have to repla­ce the line:
    for ev in sel.select(timeout):

    by

    for ev, mask in sel.select(timeout):

    Thanks for this very use­ful pie­ce of code.

    1. Thanks Patrick (and Bru­no abo­ve). It seems the chan­ge is pre­sent even in 3.4 so no need to keep the old exam­ple. I upda­ted the code accordingly.

  3. With this, I am able to wait for a time­out sha­red between events. If one times out, the next ones will wait 0 seconds.

    events=[threading.Event() for _ in range(10)] # make 10 events
    collections.deque((e.clear() for e in events), 0) # clear all event flags

    target_time = time.time() + timeout_seconds
    activated={e for e in events if e.wait(max(target_time – time.time(), 0))} # wait for events, get a set of acti­va­ted ones

    1. Very nice code !
      It works per­fect­ly for me, except a detail on lines 9 and 10 that I had to modi­fy like :
      for (key, event­smask) in sel.select(timout):
      print(key.data)

  4. Alexander Mohr Avatar
    Alexander Mohr

    How about Semaphore?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.