ScheduledExecutorService in Python with asyncio

Running scheduled tasks with ease in Python. This implementation API is based on Java ScheduledExecutorService but it has been pythonized. There are other methods of running scheduled tasks in python but this seems like a natural choice for asyncio.

The source code for the scheduler can be found here https://github.com/gregbugaj/marie-ai/blob/main/marie/concur/ScheduledExecutorService.py

The schedule_with_fixed_delay is the primary method of the scheduler with the following signature.

    def schedule_with_fixed_delay(
        self,
        func: Callable,
        initial_delay: int,
        delay: int,
        unit: TimeUnit,
        *args,
        **kwargs,
    ) -> Any:
        """
        Creates and executes a periodic action that becomes enabled first after the given initial delay,
        and subsequently with the given period.
        Args:
            func:  the task to execute
            initial_delay:  the time to delay first execution
            delay: the period between successive executions
            unit:  the time unit of the initial_delay and period parameters
        Return:
            a ScheduledTask representing pending completion of the task
        """

Creating a new scheduler and creating a task in just a few lines of code.

# Task to execute
async def async_task(name: str):
    print(f"{name} : {current_thread().name} {time.time()}  Start")
    await asyncio.sleep(1)
    print(f"{name} : {current_thread().name} {time.time()} Complete")


# Create scheduler and schedule a task
scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
t1 = scheduler.schedule_at_fixed_rate(async_task, 1, TimeUnit.MILLISECONDS, name="T1")

    

Sample Usage

import asyncio
import threading
import time

from marie.concur import ScheduledExecutorService

from threading import current_thread

from marie.concur.ScheduledExecutorService import TimeUnit


async def async_task(name: str):
    print(f"{name} : {current_thread().name} {time.time()}  Start")
    await asyncio.sleep(1)
    print(f"{name} : {current_thread().name} {time.time()} Complete")


async def cancel_callback(task):
    print(f"canceling task : {task}")
    await task.stop()


async def main():
    scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()

    t1 = scheduler.schedule_at_fixed_rate(
        async_task, 1, TimeUnit.MILLISECONDS, name="T1"
    )

    t2 = scheduler.schedule_at_fixed_rate(
        async_task, 2, TimeUnit.MILLISECONDS, name="T2"
    )

    asyncio.get_event_loop().call_later(3, asyncio.create_task, cancel_callback(t1))


async def main_single():
    scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
    t1 = scheduler.schedule_at_fixed_rate(
        async_task, 1, TimeUnit.MILLISECONDS, name="T1"
    )

    # call_later() only supports callbacks (regular functions); you  can’t pass in a coroutine.
    asyncio.get_event_loop().call_later(3, asyncio.create_task, cancel_callback(t1))
    # await t1.task


async def main_delay():
    scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
    t1 = scheduler.schedule_with_fixed_delay(
        async_task, 2, 1, TimeUnit.MILLISECONDS, name="T1"
    )

    await t1.start()
    print(t1.task)

    asyncio.get_event_loop().call_later(3, cancel_callback, t1)

    await t1.task


if __name__ == "__main__":
    print("Main")
    loop = asyncio.get_event_loop()
    try:
        # asyncio.ensure_future(main_single())
        asyncio.ensure_future(main())
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        print("Closing Loop")
        loop.close()

I am utilizing this in the Marie-AI project for my watchdog service.

Leave a Comment

Your email address will not be published. Required fields are marked *