Join us and get access to thousands of tutorials and a community of expert Pythonistas.

Unlock This Lesson

This lesson is for members only. Join us and get access to thousands of tutorials and a community of expert Pythonistas.

Unlock This Lesson

The Producer-Consumer Pipeline: Part 2

In this lesson, you’ll begin creating a multi-threaded producer-consumer pipeline. If you download the sample code, you can get your own copy of 12-prodcom.py:

Download

Sample Code (.zip)

12.9 KB

To learn more, you can also check out the documentation.

00:00 In the previous video, you learned what a producer-consumer pipeline is, and now you’re going to start to implement one. Start by creating a class called Pipeline and give it an .__init__() method with a parameter of capacity.

00:16 This will be the number of messages that you can put into your pipeline. Set self.capacity to that value, and then define self.message.

00:30 This is going to represent this shared piece of data that the producer puts in and the consumer reads from the pipeline. When a Pipeline is first initialized, set the .message to None.

00:45 Now, create your producer and your consumer. Let’s start with the producer. Define the producer() function that is going to take a pipeline as well as a message. And recall that the producer’s job is to put messages into the pipeline.

01:02 We can only put capacity number of messages in the Pipeline, so let’s restrict that by creating messages within a loop. for _ in range(pipeline.capacity): so we’re only going to create capacity number of messages, we’ll say message is—and instead of doing something really complex, let’s just make the message a random number.

01:30 So we’ll say random.randint() and let’s say from 1 to a 100. We’ll need to import the random package.

01:43 And so our producer() is going to produce a message that’s a random number, and then we need to get that message into the queue.

01:50 So we need to write a method in our Pipeline class that allows us to set a .message. But for now in our producer() we can just say pipeline.set_message() and we’ll set the message that we create here.

02:08 Now, in the end, when we’re done looping capacity number of times and creating capacity number of messages and putting them in the pipeline, we need to signal to our consumer that we are done—that the producer cannot produce any more messages.

02:24 So when we’re out of the loop, make another call to pipeline.set_message(), and give it a special message, so let’s just say that special message is FINISH.

02:38 And then this will be kind of like a global variable, if you will—a flag that the consumer is going to read. So let’s define FINISH outside the class so that all parts of our program can read this variable. This will just be a string and let’s say it’s 'THE END'.

02:59 The producer() is going to create capacity number of messages, put them in the pipeline, and then when it’s done, it will put a message of 'THE END' to represent it’s done producing.

03:11 Now you need to define your .set_message() method in the Pipeline. Underneath .__init__(), create a .set_message() method that takes in a message.

03:23 And let’s make a print statement here and we’ll say f'producing message of {message}'. And then the .set_message() method will take self.message and set it to the message that was passed to the producer()or rather, that the producer() generated.

03:45 So now, you’ve created your producer() function along with the .set_message() method that puts the message in the pipeline.

03:52 Now you need to create your consumer() function.

03:57 Drop down below your producer() and define consumer(), and this is going to take a pipeline as well. The consumer’s job is to read messages off of the queue and then do something with them. And since we used this FINISH flag, we can write a loop within the consumer() function.

04:16 while message is not FINISH: then we’ll read in the message, so we’ll say message = get_message(). Once we get the message, we’ll check if it is the 'THE END' that we’ve defined here as FINISH.

04:36 We’ll say if message is FINISH

04:42 rather, if message is not FINISH: then we will take that message and then maybe send it to another server or do some computation, but for now we’ll just replicate that by doing a time.sleep() and we’ll sleep for a random number this time.

04:58 random.random() produces a value between 0 and 1, so we’ll sleep for some amount of time less than 1 second.

05:09 At this point, when the message is read and it is 'THE END', it is FINISH, then we’ll just break out of this function and return back to the main thread.

05:19 You may have noticed that we are referencing message without defining it in the scope of this function. So what you need to do is actually define message before we enter the loop, and you can just set it to None. As long as it’s defined, that will allow us to continue.

05:36 Now you’ve created your consumer() function and it’s reading messages off the queue until it hits that FINISH message, and then it will break out and return to the main thread.

05:45 But you still need to define the get_message() method, and that is actually part of the Pipeline class, so this is actually going to be pipeline.get_message().

05:56 And then come on up to your Pipeline class and define your .get_message() method, and this does not take any arguments besides the Pipeline itself.

06:08 And let’s do a print statement under here, that says f'consuming message of {self.message}'. .get_message() will read the message off of the Pipeline, so we’ll say message = self.message.

06:30 And then we want to return that message back to the consumer(). If we go back down to the consumer(), we’ll see the message is equal to the return value of pipeline.get_message().

06:43 And we check if that’s 'THE END'. If it’s not, we continue reading from the pipeline.

06:52 Now that you’ve created your consumer() function, along with the corresponding .get_message() method that goes into your Pipeline, we now have all the components we need to actually produce this consumer-producer pipeline.

07:07 Go ahead and open up a terminal and let’s execute this program.

07:14 And I totally forgot to make an entry point, ha, so let’s exit out of the terminal, drop down underneath your consumer() function, and create your entry point.

07:23 if __name__ == '__main__': then we’ll create a Pipeline. So we’ll say pipeline = Pipeline() and we need to pass it the capacity—let’s do 10 for now.

07:38 Then we’ll use our ThreadPoolExecutor to create the producer and the consumer threads. with concurrent.futures.ThreadPoolExecutor() and the max_workers is going to be 2—one for each thread.

07:59 So we have ex and we’ll use .submit() again to start and join the threads. The target function is producer, so we want to start with the producer and we want to pass in the pipeline.

08:15 The producer() function takes in a pipeline and a message. So. Our message is—actually, this is incorrect. We don’t need to pass in a message here.

08:30 The message is actually generated in this loop. All we need to do is pass the producer() the pipeline.

08:38 And then we do the same thing down here for our consumer.

08:44 I think it’s nice to be able to see the Pipeline at the end of the program, so go down underneath the ThreadPoolExecutor block and we’ll do a print statement and we’ll say f'producer: {producer_pipeline}', and then we’ll do the same for our consumer, and this is consumer_pipeline. So we haven’t defined these yet, but we are going to right now. Right above our entry point we’ll say producer_pipeline is just an empty list, and same for consumer_pipeline.

09:27 And that means we’re going to want to append the messages as they are produced and consumed. So in the .set_message() and .get_message() methods that you created, add in a call to append to those lists. The .set_message() is used by the producer, so producer_pipeline.append(message).

09:52 And then we can copy that into the .get_message() and just change producer to consumer. And this is just for learning purposes so you can see the end result of the action from the producer and the consumer.

10:09 Now, open up a terminal and let’s try to execute our program.

10:19 I forgot to import concurrent.futures, so I’m going to go up here and import concurrent.futures. Also, I’m going to need to import time.

10:32 Save that, clear the screen, and let’s try this again. Okay. That was very fast. Let’s go ahead and scroll up to the top of the output, where we see producing message of 1.

10:45 We see a bunch of producing messages and we actually see one consuming message of THE END. So, this isn’t exactly what we want.

10:56 We produced 10 messages, but we only read the last one. Let’s just execute this again and see the output. What we would want is the producer to produce 23, then the consumer to consume 23, and then 36 and 36, and so on.

11:18 So, we’re getting there—we’ve actually created a pipeline as well as a consumer and a producer with their corresponding methods, but something isn’t right here.

11:28 In the next video, let’s see if we can fix this problem.

fofanovis on April 4, 2020

I would add that you face a deadlock not because of an inherently wrong locking/releasing but because you just do not catch exceptions thrown out of release method. RuntimeException happens when you try to release a released lock. BTW, it is not enough to just check the locked flag because of possible race conditions, so there is a need to wrap release methods by try-except and everything will work fine.

I see that initial locking helps but anyway such code seems too fragile to me. So, I would add try-excepts anyway.

mo on Jan. 2, 2022

Thanks a lot for this lesson. I have a question. The method get_message is written as follows in this lesson:

    ...
    def get_message(self):
        print(f'consuming message of {self.message}')
        message = self.message
        consumer_pipeline.append(message)
        return message
    ...

My question is: why is self.message copied into message first? Why isn’t self.message used and returned directly? In other words: why not like this:

    ...
    def get_message(self):
        print(f'consuming message of {self.message}')
        consumer_pipeline.append(self.message)
        return self.message
    ...

I tried what happened and it seems to work the same. But if there is a reason to use an intermediate message I would like to understand.

Manish Sharma on Oct. 7, 2023

Hey, anyone reading these comments to answer the questions?

Bartosz Zaczyński RP Team on Oct. 8, 2023

@Manish Sharma Our team reads questions like this every day, trying to answer them as best as we can. That said, there are several questions submitted every day, so it’s not always possible for us to answer each and every one. We encourage other users to also share their knowledge and help answer questions in the comments section or take advantage of the collective knowledge of our Slack community.

Though I’m not the creator of this course, the use of message = self.message looks redundant in this case, as someone has correctly pointed out before. This is probably a matter of coding style or a way to be explicit for teaching purposes.

Become a Member to join the conversation.