Django – run a function every x seconds

For many small projects celery is overkill. For those projects you can use schedule, it’s very easy to use.

With this library you can make any function execute a task periodically:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1) 

The example runs in a blocking manner, but if you look in the FAQ, you will find that you can also run tasks in a parallel thread, such that you are not blocking, and remove the task once not needed anymore:

import threading    
import time 

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

Here is an example for usage in a class method:

    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds
    
    ...
    # later on foo is not needed any more:
    self._job_stop.set()
    
    ...
    
    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()
    

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)