DIY相机(二)picamera2库
Published:
上一篇我们说的是树莓派camera的libcamera库,提供了在命令行的对camera的操作功能。今天要说的是picamera2库,树莓派官方提供的picamera2库是针对libcamera 驱动提供的 python库。Picamera2仅支持Raspberry Pi OS Bullseye 以及更新的系统。
对于Raspberry Pi OS Bullseye以及更(四声)新的系统,picamera2已经预装在系统中,无法单独安装。
在使用python和picamera2编程之前,我们先用libcamera库测试一下camera是否正常工作。
libcamera-hello -t 0
这个指令在上一篇中说过,会打开一个视频流的预览窗口,持续时间为无穷大。
picamera2
默认被安装在了系统环境中,但是在树莓派中,如果我们在系统环境下通过pip install
去安装新的包,会报如下错误:
yan@raspberrypi:~ $ pip3 install pytesseract
error: externally-managed-environment
× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.
If you wish to install a non-Debian-packaged Python package,
create a virtual environment using python3 -m venv path/to/venv.
Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
sure you have python3-full installed.
For more information visit http://rptl.io/venv
note: If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing --break-system-packages.
hint: See PEP 668 for the detailed specification.
因此我们要创建一个虚拟环境,来安装我们需要的其他包。同时,我们创建的虚拟环境,要继承系统环境所有的包(主要是picamera2)。
python3 -m venv --system-site-packages diy_camera
source diy_camera/bin/activate
然后我们在diy_camera这个虚拟环境中进行安装其他的包。
顺便给pip换个源。
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/
下面写个最简单的picamera2的测试程序。
简单的测试程序
from picamera2 import Picamera2, Preview
import time
picam2 = Picamera2()
camera_config = picam2.create_preview_configuration()
picam2.configure(camera_config)
picam2.start_preview(Preview.QTGL)
picam2.start()
time.sleep(2)
picam2.capture_file("test.jpg")
其中,picam2.start_preview
第一个参数,除了使用Preview.QTGL
,还可以使用Preview.DRM
。picamera2的原文是这么说的:Non X Windows users should use the same script, but replacing Preview.QTGL by Preview.DRM, so as to use the non X Windows preview implementation。
这句话是在说明在非X Window系统(指的是没有图形用户界面的Linux系统)中,应该使用 Preview.DRM
代替 Preview.QTGL
以便使用非X Window的预览实现。
具体来说:
Preview.QTGL
是用于在X Window系统中使用的一种预览类型,它会利用X Window系统的功能来显示图像预览窗口。Preview.DRM
是用于在非X Window系统中的一种预览类型,它使用Direct Rendering Manager (DRM) 接口,这是一种Linux内核中用于图形渲染的接口。在没有X Window的系统中,可以使用DRM来实现图形显示。
所以,这句话的意思是说,如果你在一个没有X Window系统的环境中(比如一个服务器或者一个没有图形界面的嵌入式系统),你应该将脚本中的 Preview.QTGL
替换为 Preview.DRM
,以便在这样的环境中使用非X Window的预览实现。
所以,对于没有X window的场景,应使用Preview.DRM
。
from picamera2 import Picamera2, Preview
import time
picam2 = Picamera2()
camera_config = picam2.create_preview_configuration()
picam2.configure(camera_config)
picam2.start_preview(Preview.DRM)
picam2.start()
time.sleep(2)
picam2.capture_file("test.jpg")
picamera2的high-level API
上面的拍照过程,还可以更简单一点,就是使用picamera2的high-level API,其中封装了拍照的流程。
from picamera2 import Picamera2
picam2 = Picamera2()
picam2.start_and_capture_file("test.jpg")
这将会拍照一张全分辨率的图片。
picamera2的相关example
多张合成来降噪
#!/usr/bin/python3
"""Example comparing capturing a single photo vs capturing multiple photos and averaging to try to reduce noise"""
import time
import numpy as np
from PIL import Image
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.NULL)
capture_config = picam2.create_still_configuration()
picam2.configure(capture_config)
picam2.start()
time.sleep(2)
with picam2.controls as ctrl:
ctrl.AnalogueGain = 1.0
ctrl.ExposureTime = 250000
time.sleep(2)
imgs = 3 # Capture 3 images to average
sumv = None
for i in range(imgs):
if sumv is None:
sumv = np.longdouble(picam2.capture_array())
img = Image.fromarray(np.uint8(sumv))
img.save("original.tif")
else:
sumv += np.longdouble(picam2.capture_array())
img = Image.fromarray(np.uint8(sumv / imgs))
img.save("averaged.tif")
同时拍摄raw格式和jpg格式的图片
#!/usr/bin/python3
# Capture a DNG and a JPEG made from the same raw data.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
capture_config = picam2.create_still_configuration(raw={})
picam2.configure(preview_config)
picam2.start()
time.sleep(2)
buffers, metadata = picam2.switch_mode_and_capture_buffers(capture_config, ["main", "raw"])
picam2.helpers.save(picam2.helpers.make_image(buffers[0], capture_config["main"]), metadata, "full.jpg")
picam2.helpers.save_dng(buffers[1], metadata, capture_config["raw"], "full.dng")
也可以这么写
#!/usr/bin/python3
# Capture a DNG and a JPEG made from the same raw data.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
capture_config = picam2.create_still_configuration(raw={}, display=None)
picam2.configure(preview_config)
picam2.start()
time.sleep(2)
r = picam2.switch_mode_capture_request_and_stop(capture_config)
r.save("main", "full.jpg")
r.save_dng("full.dng")
捕获jpg文件
#!/usr/bin/python3
# Capture a JPEG while still running in the preview mode. When you
# capture to a file, the return value is the metadata for that image.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
preview_config = picam2.create_preview_configuration(main={"size": (800, 600)})
picam2.configure(preview_config)
picam2.start_preview(Preview.QTGL)
picam2.start()
time.sleep(2)
metadata = picam2.capture_file("test.jpg")
print(metadata)
picam2.close()
捕获png文件
#!/usr/bin/python3
# Capture a PNG while still running in the preview mode.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration(main={"size": (800, 600)})
picam2.configure(preview_config)
picam2.start()
time.sleep(2)
picam2.capture_file("test.png")
连续拍摄多张照片
#!/usr/bin/python3
import time
from picamera2 import Picamera2
picam2 = Picamera2()
picam2.configure("still")
picam2.start()
# Give time for Aec and Awb to settle, before disabling them
time.sleep(1)
picam2.set_controls({"AeEnable": False, "AwbEnable": False, "FrameRate": 1.0})
# And wait for those settings to take effect
time.sleep(1)
start_time = time.time()
for i in range(1, 51):
r = picam2.capture_request()
r.save("main", f"image{i}.jpg")
r.release()
print(f"Captured image {i} of 50 at {time.time() - start_time:.2f}s")
picam2.stop()
将图像数据捕获到buffer
#!/usr/bin/python3
import io
import time
from picamera2 import Picamera2
picam2 = Picamera2()
capture_config = picam2.create_still_configuration()
picam2.configure(picam2.create_preview_configuration())
picam2.start()
time.sleep(1)
data = io.BytesIO()
picam2.capture_file(data, format='jpeg')
print(data.getbuffer().nbytes)
time.sleep(1)
data = io.BytesIO()
picam2.switch_mode_and_capture_file(capture_config, data, format='jpeg')
print(data.getbuffer().nbytes)
对camera的拍摄参数进行修改
######### 写法1
#!/usr/bin/python3
# Example of setting controls. Here, after one second, we fix the AGC/AEC
# to the values it has reached whereafter it will no longer change.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()
time.sleep(1)
metadata = picam2.capture_metadata()
print(f'metadata = {metadata}')
controls = {c: metadata[c] for c in ["ExposureTime", "AnalogueGain", "ColourGains"]}
print(controls)
picam2.set_controls(controls)
time.sleep(5)
########## 写法2
#!/usr/bin/python3
# Another (simpler!) way to fix the AEC/AGC and AWB.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()
time.sleep(1)
picam2.set_controls({"AwbEnable": 0, "AeEnable": 0})
time.sleep(5)
######## 写法3
#!/usr/bin/python3
# Example of setting controls using the "direct" attribute method.
import time
from picamera2 import Picamera2, Preview
from picamera2.controls import Controls
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()
time.sleep(1)
with picam2.controls as ctrl:
ctrl.AnalogueGain = 6.0
ctrl.ExposureTime = 60000
time.sleep(2)
ctrls = Controls(picam2)
ctrls.AnalogueGain = 1.0
ctrls.ExposureTime = 10000
picam2.set_controls(ctrls)
time.sleep(2)
捕获图片最简单的方式
#!/usr/bin/python3
from picamera2 import Picamera2
picam2 = Picamera2()
# Capture one image with the default configurations.
picam2.start_and_capture_file("test.jpg")
# Capture 3 images. Use a 0.5 second delay after the first image.
picam2.start_and_capture_files("test{:d}.jpg", num_files=3, delay=0.5) # noqa
控制预览界面的参数
#!/usr/bin/python3
# Start camera with fixed exposure and gain.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
controls = {"ExposureTime": 10000, "AnalogueGain": 1.0}
preview_config = picam2.create_preview_configuration(controls=controls)
picam2.configure(preview_config)
picam2.start()
time.sleep(5)
FrameServer,多个线程从此拿数据
#!/usr/bin/python3
# These two are only needed for the demo code below the FrameServer class.
import time
from threading import Condition, Thread
from picamera2 import Picamera2
class FrameServer:
def __init__(self, picam2, stream='main'):
"""A simple class that can serve up frames from one of the Picamera2's configured streams to multiple other threads.
Pass in the Picamera2 object and the name of the stream for which you want
to serve up frames.
"""
self._picam2 = picam2
self._stream = stream
self._array = None
self._condition = Condition()
self._running = True
self._count = 0
self._thread = Thread(target=self._thread_func, daemon=True)
@property
def count(self):
"""A count of the number of frames received."""
return self._count
def start(self):
"""To start the FrameServer, you will also need to start the Picamera2 object."""
self._thread.start()
def stop(self):
"""To stop the FrameServer
First stop any client threads (that might be
blocked in wait_for_frame), then call this stop method. Don't stop the
Picamera2 object until the FrameServer has been stopped.
"""
self._running = False
self._thread.join()
def _thread_func(self):
while self._running:
array = self._picam2.capture_array(self._stream)
self._count += 1
with self._condition:
self._array = array
self._condition.notify_all()
def wait_for_frame(self, previous=None):
"""You may optionally pass in the previous frame that you got last time you called this function.
This will guarantee that you don't get duplicate frames
returned in the event of spurious wake-ups, and it may even return more
quickly in the case where a new frame has already arrived.
"""
with self._condition:
if previous is not None and self._array is not previous:
return self._array
while True:
self._condition.wait()
if self._array is not previous:
return self._array
# Below here is just demo code that uses the class:
def thread1_func():
global thread1_count
while not thread_abort:
_ = server.wait_for_frame()
thread1_count += 1
def thread2_func():
global thread2_count
frame = None
while not thread_abort:
frame = server.wait_for_frame(frame)
thread2_count += 1
thread_abort = False
thread1_count = 0
thread2_count = 0
thread1 = Thread(target=thread1_func)
thread2 = Thread(target=thread2_func)
picam2 = Picamera2()
server = FrameServer(picam2)
thread1.start()
thread2.start()
server.start()
picam2.start()
time.sleep(5)
thread_abort = True
thread1.join()
thread2.join()
server.stop()
picam2.stop()
print("Thread1 received", thread1_count, "frames")
print("Thread2 received", thread2_count, "frames")
print("Server received", server.count, "frames")
基于opencv的人脸识别
我们在系统层面安装opencv。
sudo apt install -y python3-opencv
sudo apt install -y opencv-data
#!/usr/bin/python3
import cv2
from picamera2 import Picamera2
# Grab images as numpy arrays and leave everything else to OpenCV.
face_detector = cv2.CascadeClassifier("/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml")
cv2.startWindowThread()
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)}))
picam2.start()
while True:
im = picam2.capture_array()
grey = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
faces = face_detector.detectMultiScale(grey, 1.1, 5)
for (x, y, w, h) in faces:
cv2.rectangle(im, (x, y), (x + w, y + h), (0, 255, 0))
cv2.imshow("Camera", im)
cv2.waitKey(1)
上面的代码性能略差,下面使用picam2.post_callback
接口来实现更高性能的人脸框绘制。
#!/usr/bin/python3
import time
import cv2
from picamera2 import MappedArray, Picamera2, Preview
# This version creates a lores YUV stream, extracts the Y channel and runs the face
# detector directly on that. We use the supplied OpenGL accelerated preview window
# and delegate the face box drawing to its callback function, thereby running the
# preview at the full rate with face updates as and when they are ready.
face_detector = cv2.CascadeClassifier("/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml")
def draw_faces(request):
with MappedArray(request, "main") as m:
for f in faces:
(x, y, w, h) = [c * n // d for c, n, d in zip(f, (w0, h0) * 2, (w1, h1) * 2)]
cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0))
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
config = picam2.create_preview_configuration(main={"size": (640, 480)},
lores={"size": (320, 240), "format": "YUV420"})
picam2.configure(config)
(w0, h0) = picam2.stream_configuration("main")["size"]
(w1, h1) = picam2.stream_configuration("lores")["size"]
s1 = picam2.stream_configuration("lores")["stride"]
faces = []
picam2.post_callback = draw_faces
picam2.start()
start_time = time.monotonic()
# Run for 10 seconds so that we can include this example in the test suite.
while time.monotonic() - start_time < 100:
buffer = picam2.capture_buffer("lores")
grey = buffer[:s1 * h1].reshape((h1, s1))
faces = face_detector.detectMultiScale(grey, 1.1, 3)
通过长短帧融合实现hdr
#!/usr/bin/python3
import time
import cv2
import numpy as np
from picamera2 import Picamera2
# Simple Mertens merge with 3 exposures. No image alignment or anything fancy.
RATIO = 3.0
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()
# Run for a second to get a reasonable "middle" exposure level.
time.sleep(1)
metadata = picam2.capture_metadata()
exposure_normal = metadata["ExposureTime"]
gain = metadata["AnalogueGain"] * metadata["DigitalGain"]
picam2.stop()
controls = {"ExposureTime": exposure_normal, "AnalogueGain": gain}
capture_config = picam2.create_preview_configuration(main={"size": (1920, 1080),
"format": "RGB888"},
controls=controls)
picam2.configure(capture_config)
picam2.start()
normal = picam2.capture_array()
picam2.stop()
st=time.time()
exposure_short = int(exposure_normal / RATIO)
picam2.set_controls({"ExposureTime": exposure_short, "AnalogueGain": gain})
picam2.start()
short = picam2.capture_array()
picam2.stop()
print(f'elapsed time: {time.time()-st}',flush=True)
exposure_long = int(exposure_normal * RATIO)
picam2.set_controls({"ExposureTime": exposure_long, "AnalogueGain": gain})
picam2.start()
long = picam2.capture_array()
picam2.stop()
merge = cv2.createMergeMertens()
merged = merge.process([short, normal, long])
merged = np.clip(merged * 255, 0, 255).astype(np.uint8)
cv2.imwrite("normal.jpg", normal)
cv2.imwrite("merged.jpg", merged)
通过对比merged.jpg和normal.jpg,可以看出来,merge后的图片,灯的高光压制更好,暗部更亮,对比度更高。通过直方图看,merge后的图片直方图整体右移,说明其亮度变得更高,且在暗部有一个突起,提高了整体的对比度。
raw图相关的api
#!/usr/bin/python3
# Configure a raw stream and capture an image from it.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration(raw={"size": picam2.sensor_resolution})
print(preview_config)
picam2.configure(preview_config)
picam2.start()
time.sleep(2)
raw = picam2.capture_array("raw")
print(raw.shape)
print(picam2.stream_configuration("raw"))
对预览图像做翻转
#!/usr/bin/python3
# Run the camera with a 180 degree rotation.
import time
import libcamera
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
preview_config["transform"] = libcamera.Transform(hflip=1, vflip=1)
picam2.configure(preview_config)
picam2.start()
time.sleep(5)
拍照相关的config
#!/usr/bin/python3
# Use the configuration structure method to do a full res capture.
import time
from picamera2 import Picamera2
picam2 = Picamera2()
# We don't really need to change anyhting, but let's mess around just as a test.
picam2.preview_configuration.size = (800, 600)
picam2.preview_configuration.format = "YUV420"
picam2.still_configuration.size = (1600, 1200)
picam2.still_configuration.enable_raw()
picam2.still_configuration.raw.size = picam2.sensor_resolution
picam2.start("preview", show_preview=True)
time.sleep(2)
picam2.switch_mode_and_capture_file("still", "test_full.jpg")
可以在预览窗口添加曝光值等数据
#!/usr/bin/python3
import time
from picamera2 import Picamera2
picam2 = Picamera2()
picam2.start(show_preview=True)
time.sleep(0.5)
# Or you could do this before starting the camera.
picam2.title_fields = ["ExposureTime", "AnalogueGain", "DigitalGain"]
time.sleep(2)
# And you can change it too.
picam2.title_fields = ["ColourTemperature", "ColourGains"]
time.sleep(2)
yuv2rgb
#!/usr/bin/python3
import cv2
from picamera2 import Picamera2
cv2.startWindowThread()
picam2 = Picamera2()
config = picam2.create_preview_configuration(lores={"size": (640, 480)})
picam2.configure(config)
picam2.start()
while True:
yuv420 = picam2.capture_array("lores")
rgb = cv2.cvtColor(yuv420, cv2.COLOR_YUV420p2RGB)
cv2.imshow("Camera", rgb)
其中,lores
表示低分辨率。
变焦控制
#!/usr/bin/python3
# How to do digital zoom using the "ScalerCrop" control.
import time
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()
time.sleep(2)
size = picam2.capture_metadata()['ScalerCrop'][2:]
full_res = picam2.camera_properties['PixelArraySize']
for _ in range(20):
# This syncs us to the arrival of a new camera frame:
picam2.capture_metadata()
size = [int(s * 0.95) for s in size]
offset = [(r - s) // 2 for r, s in zip(full_res, size)]
picam2.set_controls({"ScalerCrop": offset + size})
time.sleep(2)
tensorflow 分割
首先需要安装tflite
source diy_camera/bin/activate
pip install tflite-runtime
#!/usr/bin/python3
## segmentation.py
# Usage: ./segmentation.py --model deeplapv3.tflite --label deeplab_labels.txt
import argparse
import select
import sys
import time
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
from PIL import Image
from picamera2 import Picamera2, Preview
normalSize = (640, 480)
lowresSize = (320, 240)
masks = {}
captured = []
segmenter = None
def ReadLabelFile(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
ret = {}
for line in lines:
pair = line.strip().split(maxsplit=1)
ret[int(pair[0])] = pair[1].strip()
return ret
def InferenceTensorFlow(image, model, colours, label=None):
global masks
if label:
labels = ReadLabelFile(label)
else:
labels = None
interpreter = tflite.Interpreter(model_path=model, num_threads=4)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
height = input_details[0]['shape'][1]
width = input_details[0]['shape'][2]
o_height = output_details[0]['shape'][1]
o_width = output_details[0]['shape'][2]
floating_model = False
if input_details[0]['dtype'] == np.float32:
floating_model = True
rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
picture = cv2.resize(rgb, (width, height))
input_data = np.expand_dims(picture, axis=0)
if floating_model:
input_data = np.float32(input_data / 255)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output = interpreter.get_tensor(output_details[0]['index'])[0]
mask = np.argmax(output, axis=-1)
found_indices = np.unique(mask)
colours = np.loadtxt(colours)
new_masks = {}
for i in found_indices:
if i == 0:
continue
output_shape = [o_width, o_height, 4]
colour = [(0, 0, 0, 0), colours[i]]
overlay = (mask == i).astype(np.uint8)
overlay = np.array(colour)[overlay].reshape(
output_shape).astype(np.uint8)
overlay = cv2.resize(overlay, normalSize)
if labels is not None:
new_masks[labels[i]] = overlay
else:
new_masks[i] = overlay
masks = new_masks
print("Found", masks.keys())
def capture_image_and_masks(picam2: Picamera2, model, colour_file, label_file):
global masks
# Disable Aec and Awb so all images have the same exposure and colour gains
picam2.set_controls({"AeEnable": False, "AwbEnable": False})
time.sleep(1.0)
request = picam2.capture_request()
image = request.make_image("main")
lores = request.make_buffer("lores")
stride = picam2.stream_configuration("lores")["stride"]
grey = lores[:stride * lowresSize[1]].reshape((lowresSize[1], stride))
InferenceTensorFlow(grey, model, colour_file, label_file)
for k, v in masks.items():
comp = np.array([0, 0, 0, 0]).reshape(1, 1, 4)
mask = (~((v == comp).all(axis=-1)) * 255).astype(np.uint8)
label = k
label = label.replace(" ", "_")
if label in captured:
label = f"{label}{sum(label in x for x in captured)}"
cv2.imwrite(f"mask_{label}.png", mask)
image.save(f"img_{label}.png")
captured.append(label)
print(masks.keys())
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--model', help='Path of the segmentation model.', required=True)
parser.add_argument('--label', help='Path of the labels file.')
parser.add_argument('--colours', help='File path of the label colours.')
parser.add_argument('--output', help='File path of the output image.')
args = parser.parse_args()
if args.output:
output_file = args.output
else:
output_file = 'out.png'
if args.label:
label_file = args.label
else:
label_file = None
if args.colours:
colour_file = args.colours
else:
colour_file = "colours.txt"
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
config = picam2.create_preview_configuration(main={"size": normalSize},
lores={"size": lowresSize, "format": "YUV420"})
picam2.configure(config)
stride = picam2.stream_configuration("lores")["stride"]
picam2.start()
try:
while True:
buffer = picam2.capture_buffer("lores")
grey = buffer[:stride * lowresSize[1]].reshape((lowresSize[1], stride))
InferenceTensorFlow(grey, args.model, colour_file, label_file)
overlay = np.zeros((normalSize[1], normalSize[0], 4), dtype=np.uint8)
global masks
for v in masks.values():
overlay += v
# Set Alphas and overlay
overlay[:, :, -1][overlay[:, :, -1] == 255] = 150
picam2.set_overlay(overlay)
# Check if enter has been pressed
i, o, e = select.select([sys.stdin], [], [], 0.1)
if i:
input()
capture_image_and_masks(picam2, args.model, colour_file, label_file)
picam2.stop()
if input("Continue (y/n)?").lower() == "n":
raise KeyboardInterrupt
picam2.start()
except KeyboardInterrupt:
print(f"Have captured {captured}")
todo = input("What to composite?")
bg = input("Which image to use as background (empty for none)?")
todo = todo.split()
images = []
masks = []
if bg:
base_image = Image.open(f"img_{bg}.png")
else:
base_image = np.zeros((normalSize[1], normalSize[0], 3), dtype=np.uint8)
base_image = Image.fromarray(base_image)
for item in todo:
images.append(Image.open(f"img_{item}.png"))
masks.append(Image.open(f"mask_{item}.png"))
for i in range(len(masks)):
base_image = Image.composite(images[i], base_image, masks[i])
base_image.save(output_file)
if __name__ == '__main__':
main()
需要从https://github.com/raspberrypi/picamera2/tree/main/examples/tensorflow
下载全部文件。
然后命令行执行
python segmentation.py --model deeplapv3.tflite --label deeplab_labels.txt
总体来说,效果差强人意,分割边缘不精细,可分割物体太少。
tflite 检测
#!/usr/bin/python3
# Copyright (c) 2022 Raspberry Pi Ltd
# Author: Alasdair Allan <alasdair@raspberrypi.com>
# SPDX-License-Identifier: BSD-3-Clause
# A TensorFlow Lite example for Picamera2 on Raspberry Pi OS Bullseye
#
# Install necessary dependences before starting,
#
# $ sudo apt update
# $ sudo apt install build-essential
# $ sudo apt install libatlas-base-dev
# $ sudo apt install python3-pip
# $ pip3 install tflite-runtime
# $ pip3 install opencv-python==4.4.0.46
# $ pip3 install pillow
# $ pip3 install numpy
#
# and run from the command line,
#
# $ python3 real_time_with_labels.py --model mobilenet_v2.tflite --label coco_labels.txt
import argparse
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
from picamera2 import MappedArray, Picamera2, Preview
normalSize = (640, 480)
lowresSize = (320, 240)
rectangles = []
def ReadLabelFile(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
ret = {}
for line in lines:
pair = line.strip().split(maxsplit=1)
ret[int(pair[0])] = pair[1].strip()
return ret
def DrawRectangles(request):
with MappedArray(request, "main") as m:
for rect in rectangles:
print(rect)
rect_start = (int(rect[0] * 2) - 5, int(rect[1] * 2) - 5)
rect_end = (int(rect[2] * 2) + 5, int(rect[3] * 2) + 5)
cv2.rectangle(m.array, rect_start, rect_end, (0, 255, 0, 0))
if len(rect) == 5:
text = rect[4]
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(m.array, text, (int(rect[0] * 2) + 10, int(rect[1] * 2) + 10),
font, 1, (255, 255, 255), 2, cv2.LINE_AA)
def InferenceTensorFlow(image, model, output, label=None):
global rectangles
if label:
labels = ReadLabelFile(label)
else:
labels = None
interpreter = tflite.Interpreter(model_path=model, num_threads=4)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
height = input_details[0]['shape'][1]
width = input_details[0]['shape'][2]
floating_model = False
if input_details[0]['dtype'] == np.float32:
floating_model = True
rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
initial_h, initial_w, channels = rgb.shape
picture = cv2.resize(rgb, (width, height))
input_data = np.expand_dims(picture, axis=0)
if floating_model:
input_data = (np.float32(input_data) - 127.5) / 127.5
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
detected_boxes = interpreter.get_tensor(output_details[0]['index'])
detected_classes = interpreter.get_tensor(output_details[1]['index'])
detected_scores = interpreter.get_tensor(output_details[2]['index'])
num_boxes = interpreter.get_tensor(output_details[3]['index'])
rectangles = []
for i in range(int(num_boxes)):
top, left, bottom, right = detected_boxes[0][i]
classId = int(detected_classes[0][i])
score = detected_scores[0][i]
if score > 0.5:
xmin = left * initial_w
ymin = bottom * initial_h
xmax = right * initial_w
ymax = top * initial_h
box = [xmin, ymin, xmax, ymax]
rectangles.append(box)
if labels:
print(labels[classId], 'score = ', score)
rectangles[-1].append(labels[classId])
else:
print('score = ', score)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--model', help='Path of the detection model.', required=True)
parser.add_argument('--label', help='Path of the labels file.')
parser.add_argument('--output', help='File path of the output image.')
args = parser.parse_args()
if (args.output):
output_file = args.output
else:
output_file = 'out.jpg'
if (args.label):
label_file = args.label
else:
label_file = None
picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
config = picam2.create_preview_configuration(main={"size": normalSize},
lores={"size": lowresSize, "format": "YUV420"})
picam2.configure(config)
stride = picam2.stream_configuration("lores")["stride"]
picam2.post_callback = DrawRectangles
picam2.start()
while True:
buffer = picam2.capture_buffer("lores")
grey = buffer[:stride * lowresSize[1]].reshape((lowresSize[1], stride))
_ = InferenceTensorFlow(grey, args.model, output_file, label_file)
if __name__ == '__main__':
main()
总体来说,实时性还是挺好的,但是识别准确率差。