David Blume's GitList
Repositories
python_pinger.git
Code
Commits
Branches
Tags
Search
Tree:
544bf52
Branches
Tags
main
python_pinger.git
long_lived_worker_queue.py
Add type hints.
David Blume
commited
544bf52
at 2020-12-30 23:04:36
long_lived_worker_queue.py
Blame
History
Raw
#!/usr/bin/python3 # # Since this is meant to be run as a daemon, do not use #!/usr/bin/env python3, # because the process name would then be python3, not based on __FILE__. # # [filename] -d 3 192.168.1.1 192.168.1.12 & # disown [jobid] import os import sys import subprocess import time from argparse import ArgumentParser import threading import queue import signal from typing import Dict, Optional pinger_lock = threading.Lock() def log_exit(sig: int, frame) -> None: _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'PID={os.getpid()} signal={signal.Signals(sig).name} Exiting.']) _print_queue.join() sys.exit(0) def pinger() -> None: """Each queue item is a new address to ping. Ping and compare and print. Issue: Each of these threads reads from and writes to the global _results dict. """ while True: host = _ping_queue.get() now = time.localtime() success = ping(host) with pinger_lock: # Access to _results needs to be serialized need_update = success != _results[host] if need_update: _results[host] = success if need_update: status = "UP" if success else "DOWN" _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", now)} {host} {status}']) _ping_queue.task_done() def ping(host: str) -> int: """Returns True if host (str) responds to a ping request.""" return subprocess.call([*_ping_cmd, host], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) == 0 def print_manager() -> None: """The only thread allowed to write output.""" while True: job = _print_queue.get() for line in job: log(line) _print_queue.task_done() def log(*args, **kwargs) -> None: """Opens the logfile, writes the log, and closes the logfile.""" # I don't leave the log file open for writing because another process needs access too. with open(_logname, 'a') as fp: print(*args, **kwargs, file=fp) if __name__ == '__main__': parser = ArgumentParser(description="Repeatedly ping sites. Ex: %(prog)s -d 3 192.168.1.1 192.168.1.12") parser.add_argument('-d', '--delay', type=float, default=1.0, help='Delay between pings') parser.add_argument('addresses', nargs='+', help='Addresses to ping') parser_args = parser.parse_args() _results: Dict[str, Optional[int]] = dict() for addr in parser_args.addresses: _results[addr] = None delay = parser_args.delay _ping_queue = queue.Queue(2 * len(parser_args.addresses)) # No more than 2 pending pings per address _print_queue = queue.Queue() _logname = os.path.join(os.path.expanduser('~'), 'log', os.path.basename(sys.argv[0]).replace('py', 'txt')) _pid = os.getpid() # Choose the ping parameters appropriate for the platform if sys.platform == 'cygwin' or sys.platform == 'win32': _ping_cmd = ('ping', '-n', '1', '-w', '2000') else: _ping_cmd = ('ping', '-c', '1', '-W', '2', '-q') for i in range(len(parser_args.addresses)): t = threading.Thread(target=pinger, daemon=True) t.start() del t t = threading.Thread(target=print_manager, daemon=True) t.start() del t _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}' f' PID={os.getpid()} repeat={delay}s addresses={",".join(parser_args.addresses)} Starting']) signal.signal(signal.SIGINT, log_exit) signal.signal(signal.SIGTERM, log_exit) while True: for address in _results.keys(): _ping_queue.put(address) time.sleep(delay)