David Blume's GitList
Repositories
python_pinger.git
Code
Commits
Branches
Tags
Search
Tree:
544bf52
Branches
Tags
main
python_pinger.git
short_lived_workers.py
Add type hints.
David Blume
commited
544bf52
at 2020-12-30 23:04:36
short_lived_workers.py
Blame
History
Raw
#!/usr/bin/python3 # # Since this is meant to be run as a daemon, do not use #!/usr/bin/env python3, # because the process name would then be python3, not based on __FILE__. # # [filename] -d 3 192.168.1.1 192.168.1.12 & # disown [jobid] import os import sys import subprocess import time from argparse import ArgumentParser import threading import queue import signal from typing import Dict, Optional pinger_lock = threading.Lock() def log_exit(sig: int, frame) -> None: _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'PID={os.getpid()} signal={signal.Signals(sig).name} Exiting.']) _print_queue.join() sys.exit(0) def pinger(host: str) -> None: """Executes one ping, prints if there was a change, exits thread.""" now = time.localtime() success = ping(host) with pinger_lock: # Access to _results needs to be serialized need_update = success != _results[host] if need_update: _results[host] = success if need_update: status = "UP" if success else "DOWN" _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", now)} {host} {status}']) def ping(host: str) -> int: """Returns True if host (str) responds to a ping request.""" return subprocess.call([*_ping_cmd, host], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) == 0 def print_manager() -> None: """ The only thread allowed to write output. """ while True: job = _print_queue.get() for line in job: log(line) _print_queue.task_done() def log(*args, **kwargs) -> None: """Opens the logfile, writes the log, and closes the logfile.""" # I don't leave the log file open for writing because another process needs access too. with open(_logname, 'a') as fp: print(*args, **kwargs, file=fp) if __name__ == '__main__': parser = ArgumentParser(description="Repeatedly ping sites. Ex: %(prog)s -d 3 192.168.1.1 192.168.1.12") parser.add_argument('-d', '--delay', type=float, default=3.0, help='Delay between pings') parser.add_argument('addresses', nargs='+', help='Addresses to ping') parser_args = parser.parse_args() _results: Dict[str, Optional[int]] = dict() for addr in parser_args.addresses: _results[addr] = None delay = parser_args.delay _logname = os.path.join(os.path.expanduser('~'), 'log', os.path.basename(sys.argv[0]).replace('py', 'txt')) # Choose the ping parameters appropriate for the platform if sys.platform == 'cygwin' or sys.platform == 'win32': _ping_cmd = ('ping', '-n', '1', '-w', '2000') else: _ping_cmd = ('ping', '-c', '1', '-W', '2', '-q') _print_queue = queue.Queue() t = threading.Thread(target=print_manager, daemon=True) t.start() del t signal.signal(signal.SIGINT, log_exit) signal.signal(signal.SIGTERM, log_exit) _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}' f' PID={os.getpid()} repeat={delay}s addresses={",".join(parser_args.addresses)} Starting']) while True: for address in _results.keys(): t = threading.Thread(target=pinger, args=(address,), daemon=True) t.start() del t time.sleep(delay)