RabbitMQ & celery demo using image processing app on flask

Celery is a distributed system for processing messages on a task queue with a focus on real-time processing and support for task scheduling. When we have to run an expensive function that keeps user waiting for like “forever”, it is always better to use something like celery. In this blog we will be writing a face detection web app using flask, python-opencv and celery.

Before I can tell something, let me share a flask code snippet with you:

from time import sleep

@app.route("/")
def hello():
    sleep(10) # <---what would you see in this 10s?
    return "Hello World!"

Can you tell me what would you see in first 10s while we run our flask app? I know the answer, before getting the response it will keep the user waiting for 10s. We don’t love to wait 10s. We are so impatient, we want everything instantly thats the motivation that we have in modern computing. But life is cruel, we can’t get everything instantly, we understand that but our users DO NOT understand this simple truth. So we do what, we will try to sell them a feeling that we are working instantly, at least it is not taking forever to load. So we need to get over from that sleep block. How would we do that, that’s what I am going to discuss in this blog with a real life image processing app in flask.

Obviously in life we don’t need to write “sleep” to make our code run slower. We had to write plenty of function that makes our life slower. In this blog we will discuss we will be writing an application that enables user to upload a picture and we will help them to detect faces. So what is the function we have this face detection function which is very expensive. It takes almost 3-10s on my machine to detect the face of my favourite actress. Let me share my code:

#server.py

__author__ = 'sadaf2605'


import os
from flask import Flask, request, redirect, url_for
from werkzeug import secure_filename

import face_detect
from os.path import basename


UPLOAD_FOLDER = '/home/sadaf2605/flask_celery_upload_image/uploads'
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER


def allowed_file(filename):
    return '.' in filename and 
           filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS

@app.route('/', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        import time
        start_time = time.time()
        file = request.files['file']

        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)

            base,ext=os.path.splitext(filename)


            file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
            face_detect.detect(os.path.join(app.config['UPLOAD_FOLDER'], filename),os.path.join(app.config['UPLOAD_FOLDER'], base+"-face"+ext))

            print "--- %s seconds ---" % str (time.time() - start_time)
            return redirect("/")
            return redirect(url_for('uploaded_file',
                                    filename="facedetect-"+filename))

    from os import listdir
    from os.path import isfile, join
    htmlpic=""
    for f in sorted(listdir(UPLOAD_FOLDER)):
        if isfile(join(UPLOAD_FOLDER,f)):
            print f
            htmlpic+="""
            
                
            
                """

    return '''
    
    

    
    Upload new File
    

Upload new File

'''+htmlpic from flask import send_from_directory @app.route('/uploads/') def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) from werkzeug import SharedDataMiddleware app.add_url_rule('/uploads/', 'uploaded_file', build_only=True) app.wsgi_app = SharedDataMiddleware(app.wsgi_app, { '/uploads': app.config['UPLOAD_FOLDER'] }) if __name__ == "__main__": app.debug=True app.run()
#face_detect.py

import numpy as np
import cv2


face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

@app.task
def detect(src_img,dest_img):
    img = cv2.imread(src_img)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    faces = face_cascade.detectMultiScale(gray, 1.3, 1)
    for (x,y,w,h) in faces:
        cv2.rectangle(img,(x,y),(x+w,y+h),(255,0,0),5)
        roi_gray = gray[y:y+h, x:x+w]
        roi_color = img[y:y+h, x:x+w]


    cv2.imwrite(dest_img, img)

You can test this app running using

python server.py

But we don’t want our user to wait 10s to see the next page. So we will use celery and rabbitmq to help us. First of all lets install RABBITMQ and CELERY.

To install rabbitMq we will use aptitude because it installed all its dependency in the way, if you don’t have aptitude installed then:

sudo apt-get install aptitude

Now its time to install rabbitmq server

 sudo aptitude install rabbitmq-server

Now we will create user and server for rabbitmq.

sudo rabbitmqctl add_user rabbit_user password
sudo rabbitmqctl add_vhost /app_rabbit

we will set permission for our user to do everything

sudo rabbitmqctl set_permissions -p /app_rabbit rabbit_user ".*" ".*" ".*"

Now we need to restart rabbit server, so that the change gets implemented

sudo /etc/init.d/rabbitmq-server stop
sudo /etc/init.d/rabbitmq-server start

Now we will install celery:

pip install celery

Now we need to configure celery, and celery provides few decorator functions like @tasks to achieve our goal. Rabbitmq is default for celery. Now we need to know that celery communicate via broker url using a different port. We want to enqueue our image processing tasks, so we can define it in face_ditect.py but it will be better if we can put it in our server.py as it is the entry point… but whatever for now!

import numpy as np
import cv2

from celery import Celery

app= Celery(broker='amqp://rabbit_user:password@localhost:5672//app_rabbit' )


face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

@app.task
def detect(src_img,dest_img):
    img = cv2.imread(src_img)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    faces = face_cascade.detectMultiScale(gray, 1.3, 1)
    for (x,y,w,h) in faces:
        cv2.rectangle(img,(x,y),(x+w,y+h),(255,0,0),5)
        roi_gray = gray[y:y+h, x:x+w]
        roi_color = img[y:y+h, x:x+w]


    cv2.imwrite(dest_img, img)

Now it won’t change your life radically, because we are not using the decorator functions that celery provided us. To put that task in queue we need use delay function of decorator function. so we need to call detec_image.delay(src_img,dest_image) and we actually need to keep our celery server running other wise it will only put it in queue and wait for the server to run. In -A parameter of celery we need to mention which file the decorator functions are located.

To run celery server

celery worker -A detect_face -l INFO

So now finally we can change our server.py

__author__ = 'sadaf2605'


import os
from flask import Flask, request, redirect, url_for
from werkzeug import secure_filename

import face_detect
from os.path import basename


UPLOAD_FOLDER = '/home/sadaf2605/flask_celery_upload_image/uploads'
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER


def allowed_file(filename):
    return '.' in filename and 
           filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS

@app.route('/', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        import time
        start_time = time.time()
        file = request.files['file']

        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)

            base,ext=os.path.splitext(filename)


            file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
            face_detect.detect.delay(os.path.join(app.config['UPLOAD_FOLDER'], filename),os.path.join(app.config['UPLOAD_FOLDER'], base+"-face"+ext))

            print "--- %s seconds ---" % str (time.time() - start_time)
            return redirect("/")
            return redirect(url_for('uploaded_file',
                                    filename="facedetect-"+filename))

    from os import listdir
    from os.path import isfile, join
    htmlpic=""
    for f in sorted(listdir(UPLOAD_FOLDER)):
        if isfile(join(UPLOAD_FOLDER,f)):
            print f
            htmlpic+="""
            
                
            
                """

    return '''
    
    

    
    Upload new File
    

Upload new File

'''+htmlpic from flask import send_from_directory @app.route('/uploads/') def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) from werkzeug import SharedDataMiddleware app.add_url_rule('/uploads/', 'uploaded_file', build_only=True) app.wsgi_app = SharedDataMiddleware(app.wsgi_app, { '/uploads': app.config['UPLOAD_FOLDER'] }) if __name__ == "__main__": app.debug=True app.run()

So after uploading now in the front page the picture won’t show up you will need to refresh it couple of time after 5-6s to show up. Keep on refreshing and you may like to send me a pull request at: https://github.com/sadaf2605/facedetection-flaskwebapp-rabbitmq