python multithreading pipeline
1 def consumer(pipeline, event):
2 """Pretend we're saving a number in the database."""
3 while not event.is_set() or not pipeline.empty():
4 message = pipeline.get_message("Consumer")
5 logging.info(
6 "Consumer storing message: %s (queue size=%s)",
7 message,
8 pipeline.qsize(),
9 )
10
11 logging.info("Consumer received EXIT event. Exiting")
1 def producer(pipeline, event):
2 """Pretend we're getting a number from the network."""
3 while not event.is_set():
4 message = random.randint(1, 101)
5 logging.info("Producer got message: %s", message)
6 pipeline.set_message(message, "Producer")
7
8 logging.info("Producer received EXIT event. Exiting")
1 if __name__ == "__main__":
2 format = "%(asctime)s: %(message)s"
3 logging.basicConfig(format=format, level=logging.INFO,
4 datefmt="%H:%M:%S")
5 # logging.getLogger().setLevel(logging.DEBUG)
6
7 pipeline = Pipeline()
8 event = threading.Event()
9 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
10 executor.submit(producer, pipeline, event)
11 executor.submit(consumer, pipeline, event)
12
13 time.sleep(0.1)
14 logging.info("Main: about to set event")
15 event.set()