Serialize the reading and writing to the global dictionary.
David Blume

David Blume commited on 2019-07-08 20:23:11
Showing 3 changed files, with 19 additions and 7 deletions.

... ...
@@ -54,8 +54,8 @@ Multi-threaded ping calls won't block each other.
54 54
 
55 55
 ### Downsides
56 56
 
57
-The ping tasks read from, and write to a shared dictionary. (Reading and
58
-updating the last status.)
57
+The ping tasks read from, and write to a shared dictionary, so they need
58
+to serialize that access.
59 59
 
60 60
 ## Short Lived Workers
61 61
 
... ...
@@ -74,7 +74,7 @@ The worker tasks aren't in memory if they're not doing anything. So usually a sm
74 74
 
75 75
 ### Downsides
76 76
 
77
-The ping tasks still read from, and write to a shared dictionary. (Reading and updating the last status.)
77
+The ping tasks still read from, and write to that shared dictionary, so they serialize that access.
78 78
 
79 79
 ## Long Lived Looping Workers
80 80
 
... ...
@@ -13,6 +13,8 @@ from argparse import ArgumentParser
13 13
 import threading
14 14
 import queue
15 15
 
16
+pinger_lock = threading.Lock()
17
+
16 18
 
17 19
 def pinger():
18 20
     """Each queue item is a new address to ping. Ping and compare and print.
... ...
@@ -22,10 +24,14 @@ def pinger():
22 24
         host = _ping_queue.get()
23 25
         now = time.localtime()
24 26
         success = ping(host)
25
-        if success != _results[host]:
27
+        with pinger_lock:
28
+            # Access to _results needs to be serialized
29
+            need_update = success != _results[host]
30
+            if need_update:
31
+                _results[host] = success
32
+        if need_update:
26 33
             status = "UP" if success else "DOWN"
27 34
             _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", now)} {host} {status}'])
28
-            _results[host] = success
29 35
         _ping_queue.task_done()
30 36
 
31 37
 
... ...
@@ -13,15 +13,21 @@ from argparse import ArgumentParser
13 13
 import threading
14 14
 import queue
15 15
 
16
+pinger_lock = threading.Lock()
17
+
16 18
 
17 19
 def pinger(host):
18 20
     """Executes one ping, prints if there was a change, exits thread."""
19 21
     now = time.localtime()
20 22
     success = ping(host)
21
-    if success != _results[host]:
23
+    with pinger_lock:
24
+        # Access to _results needs to be serialized
25
+        need_update = success != _results[host]
26
+        if need_update:
27
+            _results[host] = success
28
+    if need_update:
22 29
         status = "UP" if success else "DOWN"
23 30
         _print_queue.put([f'{time.strftime("%Y-%m-%d %H:%M:%S", now)} {host} {status}'])
24
-        _results[host] = success
25 31
 
26 32
 
27 33
 def ping(host):
28 34