8.2. Inter-Thread Communication
8.2.1. Quick Tip
Use existing thread-safe queues for inter-thread communication.
8.2.2. Description
The concept of communication between multiple threads (or in the case of Python, multiple processes) is a very common scenario in parallel and concurrent applications. One thread or process produces data while another thread or process consumes that data. And in many cases, there could potentially be multiple producer threads or processes (in a thread/process pool) and there could potentially be multiple consumer threads or processes (again in a thread/process pool). This scenario is common and usually solved by interfacing the two threads or processes with queue.
The simplest approach to this scenario is to use a thread/process safe blocking queue, one that typically comes from the language’s standard library. In both the Java and Python example code, I am using a thread-safe blocking queue that acts as the interface between a single producer thread to a single consumer thread. It would be very easy to adapt the example code to have multiple producers and/or multiple consumers, but for simplicity there is only one of each.
What makes this solution so powerful is the queue’s ability to block based on how many messages are currently sitting in the queue. For producers, the queue will block when it reaches its maximum capacity. Since there is no more room for another message on the queue the queue will wait until space is available. For consumers, the queue will block while the queue is empty and only return a message when a producer thread puts an object onto the queue.
In both the Java and Python API for queues there is the option to supply a timeout associated with each blocking call in the scenario where you want to block but no indefinitely.
In the case of Java, there are various Queue implementations that could squeeze out better performance from the application. There are blocking queues, non-blocking queues, queues backed by arrays, queues backed by linked-lists, and even priority based queues.
8.2.3. Examples
8.2.3.1. Java
// javac InterThreadCommunication.java
// java -cp . InterThreadCommunication
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class InterThreadCommunication {
private static class Consumer {
private final BlockingQueue<Integer> queue;
private final Integer sentinel;
private Consumer(final BlockingQueue<Integer> queue,
final Integer sentinel) {
this.queue = queue;
this.sentinel = sentinel;
}
private void consume() {
System.out.println("Consumer: started.");
try {
for (Integer i = queue.take(); !Objects.equals(i, sentinel); i = queue.take()) {
System.out.println("Received: " + i.toString());
}
} catch (final InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Consumer: finished.");
}
}
}
private static class Producer {
private final BlockingQueue<Integer> queue;
private final Integer sentinel;
private final int messageCount;
private Producer(final BlockingQueue<Integer> queue, final Integer sentinel,
final int messageCount) {
this.queue = queue;
this.sentinel = sentinel;
this.messageCount = messageCount;
}
private void produce() {
System.out.println("Producer: started.");
try {
for (int i = 0; i < messageCount; i++) {
queue.put(i);
}
queue.put(sentinel);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Producer: finished.");
}
}
}
public static void main(final String[] args) throws InterruptedException {
System.out.println("Main: started.");
final int messageCount = 25;
final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(messageCount / 5);
final Integer sentinel = Integer.MIN_VALUE;
final Consumer consumer = new Consumer(queue, sentinel);
final Producer producer = new Producer(queue, sentinel, messageCount);
final ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
threadPool.execute(consumer::consume);
threadPool.execute(producer::produce);
} finally {
threadPool.shutdown();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Main: finished.");
}
}
}
8.2.3.2. Python
#!/usr/bin/env python3
# python3 ./InterThreadCommunication.py
import multiprocessing
import queue
# The number of messages to produce/consume.
MSG_COUNT = 25
# The maximum size of the queue after which put() calls will block until space is available.
QUEUE_SIZE = int(MSG_COUNT / 5)
# The message to signify the consuming iterator should stop consuming.
SENTINEL = "STOP123"
def consumer(queue):
print("- Consumer: started.")
for i in iter(queue.get, SENTINEL):
print(f"- Received: {i}")
print("- Consumer: finished.", flush=True)
def producer(queue, max_count):
print("+ Producer: started.")
for i in range(max_count):
queue.put(i)
print(f"+ Produced: {i}")
queue.put(SENTINEL)
print("+ Producer: finished.", flush=True)
def run_as_thread_pool():
from multiprocessing.pool import ThreadPool
print("#" * 40)
print("Testing thread pool")
thread_queue = queue.Queue(maxsize=QUEUE_SIZE)
pool = ThreadPool()
pool.apply_async(func=consumer, args=(thread_queue,))
pool.apply_async(func=producer, args=(thread_queue, MSG_COUNT))
pool.close()
pool.join()
assert thread_queue.empty()
print("Thread pools done.")
def run_as_process_pool():
print("#" * 40)
print("Testing process pool")
manager = multiprocessing.Manager()
process_queue = manager.Queue()
pool = multiprocessing.Pool(processes=2)
pool.apply_async(func=consumer, args=(process_queue,))
pool.apply_async(func=producer, args=(process_queue, MSG_COUNT))
pool.close()
pool.join()
assert process_queue.empty()
print("Process pools done.")
if __name__ == "__main__":
run_as_thread_pool()
run_as_process_pool()