Handling Python event loop shutdown without exceptions
Publish Date: Dec 21 '24
0 0
#!/usr/bin/env python3
fromasyncioimportgather,new_event_loop,sleep,Event,set_event_loopfromsignalimportSIGINT,SIGTERMdefshutdown_signaled():print('Shutdown requested.')shutdown.set()asyncdefsmall_work(shutdown):whilenotshutdown.is_set():awaitsleep(0.5)print('Small work is done!')print('Exited small work.')asyncdefbig_work(shutdown):whilenotshutdown.is_set():awaitsleep(5)print('Big work is done!!!')print('Exited big work.')# Create the shutdown event
shutdown=Event()# Set up a new event loop and make it the current loop
event_loop=new_event_loop()set_event_loop(event_loop)# Set up signal handlers
event_loop.add_signal_handler(SIGINT,shutdown_signaled)event_loop.add_signal_handler(SIGTERM,shutdown_signaled)# Combine the tasks
combined_tasks=gather(small_work(shutdown),big_work(shutdown))# Run the tasks and block until they complete
event_loop.run_until_complete(combined_tasks)