Introduction
Our goal is to read a local video and start pushing it into kafka, then, using a consumer read the specific topic, decode the data and play the video on screen.
We will use OpenCV library for python to process the file and encode it before sending it to Kakfa.
Note: The video will be image only, no audio will be streamed for this example. If you want to also stream audio you need to use additional tools and sink into two different topics, one for audio and one for video
For the consumer we will use it to decode it and create frames out of the messages before displaying them on screen.
Requirements
These are the only requirements
- A Kafka up an running
- Python3
- opencv-python library for Python3
- kafka-python library for Python3
- a video file locally
We will start with the producer first since the consumer is harder to test without having data in the topic.
Kafka Producer
Let us create a simple class with a Kafka producer method
from sys import argv, exit, getsizeof
from time import sleep
import cv2
from kafka import KafkaProducer
class KafkaVideoStreaming():
def __init__(self, bootstrap_servers, topic, videoFile, client_id, batch_size=65536):
self.videoFile = videoFile
self.topicKey = str(videoFile)
self.topic = topic
self.batch_size = batch_size
self.client_id = client_id
self.bootstrap_servers = bootstrap_servers
def set_producer(self):
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
api_version=(0,10,1),
client_id=self.client_id,
acks=1,
value_serializer=None,
key_serializer=str.encode,
batch_size=self.batch_size,
compression_type='gzip',
linger_ms=0,
buffer_memory=67108864,
max_request_size=65536,
max_in_flight_requests_per_connection=1,
retries=1,
)
The above class expects some arguments during init and defines a method for creating our Kafka producer client.
The mandatory arguments are:
- bootstrap_servers: The Kafka’s bootstrap servers, e.g. PLAINTEXT://localhost:9092
- topic: The Kafka topic in which we will write
- videoFile: The local video file that we will read
- client_id: The ID of our producer
Some words about the Kafka producer:
- We used acks=1 since we want to ensure the correct order of our frames
- We will not do any serialization in the payload because we are going to encode ourselfs the frames using the methods of OpenCV
- We are using gzip to compress our messages before sending them to Kafka
- The max message size we allow is 65536 bytes. The maximum batch_size is 65536 meaning that at worst we get 1msg/batch
- Our buffer memory is 67108864 bytes which translates to a maximum of ~1024 batches per buffer cycle
- We set max_in_flight requests to 1 since we do not want any message re-ordering due to failed acks
Okay with that out of the way, let us move on and create the run method which will read the video file and call the producer to send the frames.
Note: I will add the snipet that is related only for the run method to keep the lenght of this pages small. At the end I will provide the full code for the producer.
def reportCallback(self, record_metadata):
print("Topic Record Metadata: ", record_metadata.topic)
print("Parition Record Metadata: ", record_metadata.partition)
print("Offset Record Metatada: ", record_metadata.offset)
def errCallback(self, excp):
print('Errback', excp)
def run(self):
try:
print("Opening file %s" % self.videoFile)
__VIDEO_FILE = cv2.VideoCapture(self.videoFile)
except:
raise
self.set_producer()
self.keep_processing = True
try:
while(__VIDEO_FILE.isOpened()) and self.keep_processing:
readStat, frame = __VIDEO_FILE.read()
if not readStat:
self.keep_processing = False
ret, buffer = cv2.imencode('.jpg', frame)
self.producer.send(
topic=self.topic, key=self.topicKey, value=buffer.tobytes()
).add_callback(
self.reportCallback
).add_errback(
self.errCallback
)
__VIDEO_FILE.release()
except KeyboardInterrupt:
self.keep_processing = False
__VIDEO_FILE.release()
print("Keyboard interrupt was detected. Exiting...")
The two first methods in the above snippet will be used for the callback && errback feedbacks of the producer.
In the run method first we open the local file using OpenCV and we create our producer client. What follows after is a while loop with a try clause for detecting our keyboard interrupts.
Inside the while loop we read the file frame by frame __VIDEO_FILE.read(), then we read our frame into a numpay array, and finally call producer.send while encoding the numpy array into bytes. Last we call release() when the video has been processed.
Running the producer
To run the producer, append this code at the end
if __name__ == "__main__":
videoStream = KafkaVideoStreaming(
bootstrap_servers='localhost:9092',
topic='TestTopic',
videoFile=argv[1],
client_id='TestTopicKafkaVideoStreamClient',
)
videoStream.run()
and execute the producer script
# My video is in the same directory with the prodcuer script
$> python3 producer.py video.mkv
The expected output if everything went well is
Topic Record Metadata: TestTopic
Parition Record Metadata: 0
Offset Record Metatada: 0
...
Topic Record Metadata: TestTopic
Parition Record Metadata: 0
Offset Record Metatada: 543
...
Topic Record Metadata: TestTopic
Parition Record Metadata: 0
Offset Record Metatada: 862
...
The producer is ready. Now we can move to the consumer
Kafka Consumer
As with the producer, we will start by creating a simple class that has one method for creating a Kafka consumer client.
from kafka import KafkaConsumer
from time import sleep
import cv2
import numpy as np
from queue import Queue
from threading import Thread
from threading import Event
class KafkaVideoView():
def __init__(self, bootstrap_servers, topic, client_id, group_id, poll=500, frq=0.01):
self.topic = topic
self.client_id = client_id
self.group_id = group_id
self.bootstrap_servers = bootstrap_servers
self.poll = poll
self.frq = frq
def set_consumer(self):
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers.split(','),
fetch_max_bytes=52428800,
fetch_max_wait_ms=1000,
fetch_min_bytes=1,
max_partition_fetch_bytes=1048576,
value_deserializer=None,
key_deserializer=None,
max_in_flight_requests_per_connection=10,
client_id=self.client_id,
group_id=self.group_id,
auto_offset_reset='earliest',
max_poll_records=self.poll,
max_poll_interval_ms=300000,
heartbeat_interval_ms=3000,
session_timeout_ms=10000,
enable_auto_commit=True,
auto_commit_interval_ms=5000,
reconnect_backoff_ms=50,
reconnect_backoff_max_ms=500,
request_timeout_ms=305000,
receive_buffer_bytes=32768,
)
For the arguments we define:
- bootstrap_servers: same as producer
- topic: same as prodcuer
- client_id: This will be the consumer client ID
- group_id: The Consumer Group name
- poll: The maximum number of records to fetch on each request
- frq: This is the frequency that we will use to control the frame/sec
and a few notes about the consumer client options:
- We have increased by a lot the X._bytes for the different options because we will use a queue to handle the records, therefore we can poll as much as we can
- We start from the earliest commited offset. We do this in order to enable a resume from where the client left off
- We are letting the client to do the commit automatically each 5s
We will next create the run & playStream method for polling records. The run method will be responsible for polling kafka records and adding them to the queue, and the playStream method will pop the records from the queue and play them with OpenCV.
def playStream(self, queue):
while self.keepPlaying:
try:
msg = queue.get(block=True, timeout=20)
self.queue_status = True
except:
print("WARN: Timed out waiting for queue. Retrying...")
self.queue_status = False
if self.queue_status:
nparr = np.frombuffer(msg, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.keepConsuming = False
break
sleep(self.frq)
def run(self):
self.keepPlaying = True
self.set_consumer()
self.videoQueue = Queue()
self.keepConsuming = True
self.playerThread = Thread(target=self.playStream, args=(self.videoQueue, ), daemon=False)
self.playerThread.start()
try:
while self.keepConsuming:
payload = self.consumer.poll(self.poll)
for bucket in payload:
for msg in payload[bucket]:
self.videoQueue.put(msg.value)
except KeyboardInterrupt:
self.keepConsuming = False
self.keepPlaying = False
print("WARN: Keyboard Interrupt detected. Exiting...")
self.playerThread.join()
Let us start with the run method first. Initially we define some booleans that we will use to control the poll & queue play conditions. Then we create the Queue (self.videoQueue) and we start the playStream method in a thread. Notice the queue that we are passing as argument (args=(self.videoQueue)) to the playStream thread.
After we start the thread, we create a consumer that polls the topic and adds the records of each poll bucket to the queue.
The playStream method loops over trying to pop a record from the queue and play it with OpenCV2.
- The queue.get is set to blocking with a timeout of 20s. If no record is popped, the queue_status flag is set to false and the frame playing part is skipped.
- If we get a record from the queue we follow the reverse logic of what we did with the producer. We create a numpy array, then create a frame from the array and display the frame with cv2.imshow.
- We also do a check if the button q or ctrl_c has been issues. In case they have, we set keepConsuming flag to false in order for the client to stop consuming and then we break the loop.
That is all the required code that we need to play our video.
Running the consumer
To run the consumer, append this code at the end
if __name__ == "__main__":
streamVideoPlayer = KafkaVideoView(
bootstrap_servers='localhost:9092',
topic='TestTopic',
client_id='TestTopicVSClient',
group_id='TestTopicVideoStreamConsumer',
poll=500,
frq=0.016
)
streamVideoPlayer.run()
and execute the consumer script
$> python3 consumer.py
If everything went well after a while you should start seeing your video playing.
Note: The frames will play according to frq=0.016. If you want the video to play faster or slower then change the frequency number accordingly
Below I am posting the full code for each client
Full Code
Producer Code
from sys import argv, exit, getsizeof
from time import sleep
import cv2
from kafka import KafkaProducer
class KafkaVideoStreaming():
def __init__(self, bootstrap_servers, topic, videoFile, client_id, batch_size=65536):
self.videoFile = videoFile
self.topicKey = str(videoFile)
self.topic = topic
self.batch_size = batch_size
self.client_id = client_id
self.bootstrap_servers = bootstrap_servers
def set_producer(self):
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
api_version=(0,10,1),
client_id=self.client_id,
acks=1,
value_serializer=None,
key_serializer=str.encode,
batch_size=self.batch_size,
compression_type='gzip',
linger_ms=0,
buffer_memory=67108864,
max_request_size=65536,
max_in_flight_requests_per_connection=1,
retries=1,
)
def reportCallback(self, record_metadata):
print("Topic Record Metadata: ", record_metadata.topic)
print("Parition Record Metadata: ", record_metadata.partition)
print("Offset Record Metatada: ", record_metadata.offset)
def errCallback(self, excp):
print('Errback', excp)
def run(self):
try:
print("Opening file %s" % self.videoFile)
__VIDEO_FILE = cv2.VideoCapture(self.videoFile)
except:
raise
self.set_producer()
self.keep_processing = True
try:
while(__VIDEO_FILE.isOpened()) and self.keep_processing:
readStat, frame = __VIDEO_FILE.read()
if not readStat:
self.keep_processing = False
ret, buffer = cv2.imencode('.jpg', frame)
self.producer.send(
topic=self.topic, key=self.topicKey, value=buffer.tobytes()
).add_callback(
self.reportCallback
).add_errback(
self.errCallback
)
__VIDEO_FILE.release()
except KeyboardInterrupt:
self.keep_processing = False
__VIDEO_FILE.release()
print("Keyboard interrupt was detected. Exiting...")
if __name__ == "__main__":
videoStream = KafkaVideoStreaming(
bootstrap_servers='localhost:9092',
topic='TestTopic',
videoFile=argv[1],
client_id='TestTopicKafkaVideoStreamClient',
)
videoStream.run()
Consumer Code
from kafka import KafkaConsumer
from time import sleep
import cv2
import numpy as np
from queue import Queue
from threading import Thread
from threading import Event
class KafkaVideoView():
def __init__(self, bootstrap_servers, topic, client_id, group_id, poll=500, frq=0.01):
self.topic = topic
self.client_id = client_id
self.group_id = group_id
self.bootstrap_servers = bootstrap_servers
self.poll = poll
self.frq = frq
def set_consumer(self):
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers.split(','),
fetch_max_bytes=52428800,
fetch_max_wait_ms=1000,
fetch_min_bytes=1,
max_partition_fetch_bytes=1048576,
value_deserializer=None,
key_deserializer=None,
max_in_flight_requests_per_connection=10,
client_id=self.client_id,
group_id=self.group_id,
auto_offset_reset='earliest',
max_poll_records=self.poll,
max_poll_interval_ms=300000,
heartbeat_interval_ms=3000,
session_timeout_ms=10000,
enable_auto_commit=True,
auto_commit_interval_ms=5000,
reconnect_backoff_ms=50,
reconnect_backoff_max_ms=500,
request_timeout_ms=305000,
receive_buffer_bytes=32768,
)
def playStream(self, queue):
while self.keepPlaying:
try:
msg = queue.get(block=True, timeout=20)
self.queue_status = True
except:
print("WARN: Timed out waiting for queue. Retrying...")
self.queue_status = False
if self.queue_status:
nparr = np.frombuffer(msg, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.keepConsuming = False
break
sleep(self.frq)
def run(self):
self.keepPlaying = True
self.set_consumer()
self.videoQueue = Queue()
self.keepConsuming = True
self.playerThread = Thread(target=self.playStream, args=(self.videoQueue, ), daemon=False)
self.playerThread.start()
try:
while self.keepConsuming:
payload = self.consumer.poll(self.poll)
for bucket in payload:
for msg in payload[bucket]:
self.videoQueue.put(msg.value)
except KeyboardInterrupt:
self.keepConsuming = False
self.keepPlaying = False
print("WARN: Keyboard Interrupt detected. Exiting...")
self.playerThread.join()
if __name__ == "__main__":
streamVideoPlayer = KafkaVideoView(
bootstrap_servers='localhost:9092',
topic='TestTopic',
client_id='TestTopicVSClient',
group_id='TestTopicVideoStreamConsumer',
poll=500,
frq=0.016
)
streamVideoPlayer.run()
That was it! I hope you enjoyed.