【Webアプリ開発】顔検出、DB保存、加工結果映像リアルタイム表示 Python + OpenCV + SQLAlchemy + FastAPI
概要
タイトルの通り。
前にも記事を書いているようにお気に入りの笑い男GIFで遊ぶ。
と今回はFastAPIを使ってOpenCVの加工結果をDBに保存しつつ、Webブラウザ上に表示するアプリを作ってみた。
普段は画像認識系のエンジニア 兼 販促担当をやっているのだけど、プロト開発がサクッと出来るともっと売れそうなのでね。スキル向上を目指してね。
コードについて
公開先
GitHubに公開中。
基本的にはgit clone
して使ってもらえば動くと思う。
著作権的に画像とかは同梱してないので注意。各自ダウンロードのほど。
説明
構成要素
大きく分けると下記の4要素から成る。
- 画面
- 処理フローとStream API
- DBアクセス
- 顔検出&顔加工
画面
見えやすいところから噛み砕いていった方が分かりやすいと思うんでまずは画面について。
といってもtemplates/index.html
でシンプルに記述してるだけ。
Jinja2でパースできるようなフォーマット。
キモなのは
<img src="{{ url_for('video_feed') }}">
のところで、後に説明するFastAPIのスクリプト内で/video_feed
でroutingされるところにアクセスすると映像がStreamingで表示される。
<html> <head> <title>Video Streaming Demonstration</title> </head> <body> <h1>Video Streaming Demonstration</h1> <img src="{{ url_for('video_feed') }}"> </body> </html>
処理フローとStream API
ほいでここがメインな訳だけど、スクリプト名でいうとapp.py
で、DBアクセス、顔検出&顔加工とHTMLを繋ぐ役割を果たしてる。
importとか
まずはモジュールのimport
やら便利ツールのインスタンス化やらをしていきましょう。
ちなみにpyscripts
というのは自作モジュール名。Camera操作とか顔検出&加工とかDB操作とか。
import uvicorn import cv2 from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse from fastapi.templating import Jinja2Templates from pyscripts.camera import Camera from pyscripts.laughing_man import LaughingManMaskStream, detect_faces, overlay_lms from pyscripts.db import insert_faces app = FastAPI() templates = Jinja2Templates(directory="templates")
htmlのパース
index
関数はまぁ、ページ表示のためのテンプレ関数。ここでさっきのhtmlファイルをJinja2フォーマットでパースしてるのが分かる。
@app.getというのはFastAPIのデコレータ。第一引数でURLのパス指定をして、第二引数でレスポンス種別を指定。asyncはようわからん。
@app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse('index.html', {"request": request})
コア
ほいでここがキモ。
gen
関数の中では、下記の処理を行うというフローを記述してる。
- WebCameraからフレームを読み込んで、
- 描画するgifフレームを読み込んで、
- 顔を検出して、
- gifフレームを描画して、
- DBに検出結果を保存して、
- 描画結果をバイト形式にエンコードして、
- HTMLレスポンスを生成
そんでもってvideo_feed
関数でストリーミングレスポンスとしてデータを返すということになってる。
さっきindex.html
で記述してた
<img src="{{ url_for('video_feed') }}">
で引っ張ってくるデータがこれなわけだ。
def gen(camera): lm_mask_stream = LaughingManMaskStream() while True: frame = camera.get_frame() lm_mask = lm_mask_stream.next() faces = detect_faces(frame) frame = overlay_lms(frame, faces, lm_mask) insert_faces(faces) _, jpeg = cv2.imencode('.jpg', frame) byte_frame = jpeg.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + byte_frame + b'\r\n') @app.get('/video_feed', response_class=HTMLResponse) async def video_feed(): return StreamingResponse(gen(Camera()), media_type='multipart/x-mixed-replace; boundary=frame')
サーバ起動
で最後↓でサーバを起動する。
if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)
DBアクセス
ということで、全体の流れは上記のような感じで記述できたので、以降はモジュールの詳しい説明になるわけだけども。
pyscripts/db.py
の中でsqlalchemyを使ったDB定義、初期化、挿入関数定義を行っていく。
from sqlalchemy import Column, Integer, create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "sqlite:///./data/face_location.db" engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}, echo=True ) Base = declarative_base() class Face(Base): __tablename__ = "face" id = Column(Integer, primary_key=True) st_x = Column(Integer) st_y = Column(Integer) width = Column(Integer) height = Column(Integer) Base.metadata.create_all(bind=engine) def insert_faces(faces): SessionClass = sessionmaker(autocommit=False, autoflush=False, bind=engine) session = SessionClass() for face in faces: face_instance = Face() face_instance.st_x = int(face[0]) face_instance.st_y = int(face[1]) face_instance.width = int(face[2]) face_instance.height = int(face[3]) session.add(face_instance) session.commit() session.close()
顔検出&顔加工
ここの説明はこちらのページに譲ろうかな。
構成要素だけ書くと、下記のような感じ。
LaughingManMaskStream
クラスでgifから1フレームずつ引っ張ってきて描画用に加工detect_faces
関数で顔検出overlay_lms
関数で加工
import os import cv2 import numpy as np def _extract_fg(img): fg_region = np.zeros(img.shape) for v, line in enumerate(img): non_white_pxs = np.array(np.where(line < 250)) if non_white_pxs.size > 0: min_h, max_h = np.min(non_white_pxs, axis=1)[0], np.max(non_white_pxs, axis=1)[0] fg_region[v, min_h:max_h, :] = 1 return fg_region class LaughingManMaskStream: def __init__(self, gif_path=os.path.join(os.path.abspath(__file__), "laughing_man.gif")): self.cap = cv2.VideoCapture(gif_path) self.fg_region = _extract_fg(self.cap.read()[1]) def next(self): ret, frame = self.cap.read() if not ret: self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) ret, frame = self.cap.read() mask = frame * self.fg_region return mask def release(self): self.cap.release() def detect_faces(img): PATH_TO_HAARCASCADE = '/haarcascade_frontalface_default.xml' # To Edit cascade = cv2.CascadeClassifier(PATH_TO_HAARCASCADE) small_img = cv2.resize(img, (int(img.shape[1]/4), int(img.shape[0]/4))) faces = np.array(cascade.detectMultiScale(small_img)) * 4 return faces def _overlay_lm(frame, face, lm_mask): offset = 100 x = int(max(face[0] - offset / 2, 0)) y = int(max(face[1] - offset / 2, 0)) w = int(min(face[2] + offset, frame.shape[1] - x)) h = int(min(face[3] + offset, frame.shape[0] - y)) lm_mask = cv2.resize(lm_mask, (w,h)) mask_idxs = np.where(lm_mask>0) overlay_idxs = (mask_idxs[0]+y, mask_idxs[1]+x, mask_idxs[2]) frame[overlay_idxs] = lm_mask[mask_idxs] return frame def overlay_lms(frame, faces, lm_mask): if faces.shape[0] > 0: for face in faces: frame = _overlay_lm(frame, face, lm_mask) return frame
コードの走らせ方
GitHubのHow To Start
を参照のこと。
まとめ
いかがだったでしょうか。
なんていう、時代錯誤な、ボケなのか何なのかわからない、まとめ方が、真っ先に思いついた俺は、もうダメなのかもしれない。