Raspberry Pi PicoでDHT11センサーを活用!温湿度データの取得と表示方法

目次

はじめに

温度や湿度など身近なものだが目に見えないものを可視化することは非常に有益です。すでに身近に計測器がたくさん存在していますので、計測した値の確からしさを容易に検証できることもおすすめする理由の一つです。

温度/湿度センサの仕組み

温度センサー

温度の測定には、サーミスタ(熱感知抵抗)やシリコンバンドギャップセンサと呼ばれる部品を使用しています。
サーミスタは、温度によって抵抗が変わる特殊な抵抗器です。温度が上昇すると抵抗値が変化し、この変化から温度を計算します。
シリコンバンドギャップセンサは、シリコンをベースにした温度センサで、温度に応じて電流や電圧が変わる性質を利用して、非常に正確な温度を計測できます。

湿度センサ―

容量性湿度センサは、2つの電極の間に吸湿性の材料が挟まれた構造を持っています。この材料が空気中の水分を吸収すると、その容量(キャパシタンス)が変化します。
湿度が高いほど、この材料が多くの水分を吸収し、容量が増加します。センサはこの容量の変化を検出し、それを湿度として計算します。湿度センサは非常に敏感で、短時間で周囲の湿度の変化を捉えることができます。

配線

配線は非常にシンプルです。

RaspberryPi PicoDHT11
VSYS(39番)VCC
GND(38番)GND
GP15(20番)OUT
()内はピン番号

DTH11センサ値をOLEDディスプレイに表示する

温度と湿度の値ををOLEDディスプレイに表示します。


ツール→パッケージを管理からDHT11を検索してインストールください。パッケージが見つからない場合は、以下をdht.pyとして保存してください。
以下は、DHT11センサから取得したデジタル信号(パルス)を解析し、温度と湿度のデータに変換するものです。さらに、そのデータを検証しています。DHT11センサは、温度と湿度の値をデジタル信号として送信しますが、この信号を処理して意味のあるデータに変換することがこのプログラム(クラス)の役割です。

import array

import micropython
import utime
from machine import Pin
from micropython import const


class InvalidChecksum(Exception):
    pass


class InvalidPulseCount(Exception):
    pass


MAX_UNCHANGED = const(100)
MIN_INTERVAL_US = const(200000)
HIGH_LEVEL = const(50)
EXPECTED_PULSES = const(84)


class DHT11:
    _temperature: int
    _humidity: int

    def __init__(self, pin):
        self._pin = pin
        self._last_measure = utime.ticks_us()
        self._temperature = -1
        self._humidity = -1

    def measure(self):
        current_ticks = utime.ticks_us()
        if utime.ticks_diff(current_ticks, self._last_measure) < MIN_INTERVAL_US and (
            self._temperature > -1 or self._humidity > -1
        ):
            # Less than a second since last read, which is too soon according
            # to the datasheet
            return

        self._send_init_signal()
        pulses = self._capture_pulses()
        buffer = self._convert_pulses_to_buffer(pulses)
        self._verify_checksum(buffer)

        self._humidity = buffer[0] + buffer[1] / 10
        self._temperature = buffer[2] + buffer[3] / 10
        self._last_measure = utime.ticks_us()

    @property
    def humidity(self):
        self.measure()
        return self._humidity

    @property
    def temperature(self):
        self.measure()
        return self._temperature

    def _send_init_signal(self):
        self._pin.init(Pin.OUT, Pin.PULL_DOWN)
        self._pin.value(1)
        utime.sleep_ms(50)
        self._pin.value(0)
        utime.sleep_ms(18)

    @micropython.native
    def _capture_pulses(self):
        pin = self._pin
        pin.init(Pin.IN, Pin.PULL_UP)

        val = 1
        idx = 0
        transitions = bytearray(EXPECTED_PULSES)
        unchanged = 0
        timestamp = utime.ticks_us()

        while unchanged < MAX_UNCHANGED:
            if val != pin.value():
                if idx >= EXPECTED_PULSES:
                    raise InvalidPulseCount(
                        "Got more than {} pulses".format(EXPECTED_PULSES)
                    )
                now = utime.ticks_us()
                transitions[idx] = now - timestamp
                timestamp = now
                idx += 1

                val = 1 - val
                unchanged = 0
            else:
                unchanged += 1
        pin.init(Pin.OUT, Pin.PULL_DOWN)
        if idx != EXPECTED_PULSES:
            raise InvalidPulseCount(
                "Expected {} but got {} pulses".format(EXPECTED_PULSES, idx)
            )
        return transitions[4:]

    def _convert_pulses_to_buffer(self, pulses):
        """Convert a list of 80 pulses into a 5 byte buffer

        The resulting 5 bytes in the buffer will be:
            0: Integral relative humidity data
            1: Decimal relative humidity data
            2: Integral temperature data
            3: Decimal temperature data
            4: Checksum
        """
        # Convert the pulses to 40 bits
        binary = 0
        for idx in range(0, len(pulses), 2):
            binary = binary << 1 | int(pulses[idx] > HIGH_LEVEL)

        # Split into 5 bytes
        buffer = array.array("B")
        for shift in range(4, -1, -1):
            buffer.append(binary >> shift * 8 & 0xFF)
        return buffer

    def _verify_checksum(self, buffer):
        # Calculate checksum
        checksum = 0
        for buf in buffer[0:4]:
            checksum += buf
        if checksum & 0xFF != buffer[4]:
            raise InvalidChecksum()

以下がメインのプログラムです。

from machine import Pin, I2C
import ssd1306
import utime as time
from dht import DHT11, InvalidChecksum

# DHT11センサの設定
DHTPin = Pin(15, Pin.OUT, Pin.PULL_DOWN)

# I2Cの設定 (SDA = GP16, SCL = GP17)
i2c = I2C(0, scl=Pin(17), sda=Pin(16), freq=400000)

# I2Cデバイスのアドレス確認
devices = i2c.scan()
if devices:
    print("I2Cデバイスが見つかりました:")
    for device in devices:
        print("デバイスアドレス: ", hex(device))
else:
    print("I2Cデバイスが見つかりませんでした")

# OLEDの設定(幅128ピクセル、高さ64ピクセル)
try:
    print("OLED初期化開始")
    oled = ssd1306.SSD1306_I2C(128, 64, i2c)
    print("OLED初期化成功")
except Exception as e:
    print("OLED初期化エラー:", e)
    raise e

# メインループ
while True:
    try:
        # DHT11センサのデータを読み取る
        sensor = DHT11(DHTPin)
        time.sleep(2)  # 読み取りの前に少し待機

        # 温度と湿度を取得
        t = sensor.temperature
        h = sensor.humidity

        # コンソールに表示
        print("Temperature: " + str(t))
        print("Humidity: " + str(h))

        # OLEDディスプレイに表示
        oled.fill(0)  # 画面をクリア
        oled.text("Temp: {} C".format(t), 0, 0)
        oled.text("Humidity: {} %".format(h), 0, 10)
        oled.show()  # 画面を更新

    except InvalidChecksum:
        print("センサデータの読み取りに失敗しました。")

    time.sleep(2)  # 2秒ごとにデータを更新
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

目次