Project

General

Profile

Actions

ToDo #10

open
TO TO

ToDo #8: 組込みシステムとNostr

Raspberry Pi 側実装案

ToDo #10: Raspberry Pi 側実装案

Added by Tomohiko Okamoto about 1 month ago. Updated 7 days ago.

Status:
進行中
Priority:
通常
Start date:
11/04/2025
Due date:
% Done:

50%

Estimated time:

Description

・Pythonで実装
・第1弾はDHT11で温湿度を取得・送信する。
・以下の処理はライブラリ化したい。
 ・秘密鍵/公開鍵の生成チェック、生成されていない場合は生成。
 ・Nostrリレーサーバーへメタデータ(ユーザー名など)存在チェック、存在しなければメタデータ送信

TO Updated by Tomohiko Okamoto about 1 month ago · Edited Actions #1

  • Status changed from 新規 to 進行中

以下、Geminiとの本件に関するチャットの履歴URL
https://gemini.google.com/share/7cb8dc8298c1

TO Updated by Tomohiko Okamoto about 1 month ago Actions #2

以下、Geminiとの会話の最後のまとめをコピペ。


はい、その通りです。鍵の生成・管理とメタデータの発行は、センサーの種類に依存しないNostrクライアントの基本機能であり、そこだけを独立したユーティリティモジュールとして切り出すのは非常に良い設計です。💡

これにより、将来的にDHT11以外のセンサー(例:BME280、GPSセンサーなど)を使いたい場合でも、そのセンサー固有の読み取りロジック(メインファイル)だけを書き換えれば済み、Nostr関連のコード(ライブラリ)は再利用できます。

以下の構成でコードを提供します。

  1. nostr_key_manager.py: 鍵とメタデータ管理に特化したライブラリファイル
  2. main_dht11.py: センサー固有の処理と、ライブラリを利用したメイン実行ファイル

1. Nostrユーティリティモジュール (nostr_key_manager.py)

Nostrの鍵管理とメタデータチェックに特化したクラスを実装します。

# nostr_key_manager.py

import os
import time
import json
import logging
from pynostr.event import Event
from pynostr.key import PrivateKey
from pynostr.bech32 import encode_pubkey, encode_secret_key
from pynostr.relays import RelayManager

class NostrKeyManager:
    """
    Nostrの秘密鍵の生成/読み込み、およびkind=0 (メタデータ) イベントの
    チェックと公開を管理するユーティリティクラス。
    """
    
    def __init__(self, key_file: str, relay_url: str):
        """
        初期化メソッド。鍵ファイルパスとリレーURLを設定する。
        """
        self.KEY_FILE = key_file
        self.RELAY_URL = relay_url
        self.private_key_hex = None
        self.public_key_hex = None
        
        logging.info("NostrKeyManagerを初期化しました。")

    def setup_keys(self) -> bool:
        """
        鍵ファイルが存在するかチェックし、鍵ペアをロードまたは生成する。
        成功した場合Trueを返す。
        """
        
        if os.path.exists(self.KEY_FILE):
            logging.info(f"既存の鍵ファイル ({self.KEY_FILE}) を読み込みます...")
            try:
                with open(self.KEY_FILE, 'r') as f:
                    nsec = f.read().strip()
                private_key = PrivateKey.from_nsec(nsec)
            except Exception as e:
                logging.error(f"鍵ファイルの読み込み中にエラーが発生しました: {e}")
                return False
        else:
            logging.info("鍵ファイルが見つかりませんでした。新しい鍵ペアを生成します...")
            private_key = PrivateKey()
            nsec_to_save = encode_secret_key(private_key.hex())
            try:
                with open(self.KEY_FILE, 'w') as f:
                    f.write(nsec_to_save + '\n')
                logging.info(f"新しい秘密鍵を {self.KEY_FILE} に保存しました。")
            except Exception as e:
                logging.error(f"秘密鍵の保存中にエラーが発生しました: {e}")
                return False
                
        self.private_key_hex = private_key.hex()
        self.public_key_hex = private_key.public_key.hex()
        npub = encode_pubkey(self.public_key_hex)
        logging.info(f"🔑 鍵の準備完了。 npub: {npub}")
        return True

    def check_and_publish_metadata(self):
        """
        リレーにkind=0イベントが存在するかチェックし、存在しなければ新規送信する。
        """
        if not self.private_key_hex or not self.public_key_hex:
            logging.error("メタデータ送信:鍵情報が設定されていません。")
            return

        private_key = PrivateKey(bytes.fromhex(self.private_key_hex))
        
        relay_manager = RelayManager(timeout=5)
        relay_manager.add_relay(self.RELAY_URL)
        
        # 1. 既存のkind=0イベントの存在チェック (REQ)
        subscription_id = os.urandom(8).hex()
        relay_manager.add_subscription(subscription_id, filters=[{'authors': [self.public_key_hex], 'kinds': [0], 'limit': 1}])
        logging.info(f"リレー ({self.RELAY_URL}) に kind=0 イベントの存在チェックを要求します...")
        time.sleep(3) 
        events = relay_manager.get_events_for_subscription(subscription_id)
        
        if events:
            logging.info("✅ kind=0 メタデータがリレーに既に存在します。スキップします。")
            relay_manager.close_connections() 
            return

        # 2. メタデータが存在しない場合:新規 kind=0 イベントを作成して送信 (EVENT)
        logging.info("🚨 kind=0 メタデータが存在しません。新規作成して送信します。")
        
        # デフォルトのユーザー名を設定
        metadata_content = {
            "name": f"RPi-Sensor_{self.public_key_hex[:4]}",
            "about": "Raspberry Pi からセンサーデータを定期送信しています。",
            "display_name": "RPi-Sensor"
        }
        
        kind0_event = Event(
            public_key=self.public_key_hex,
            content=json.dumps(metadata_content),
            kind=0,
        )
        private_key.sign_event(kind0_event)
        
        relay_manager.publish_event(kind0_event)
        logging.info(f"📡 kind=0 イベント (ユーザー名: {metadata_content['name']}) を送信しました。")
        
        time.sleep(3)
        relay_manager.close_connections()

    def get_private_key_hex(self) -> str | None:
        """秘密鍵のHEX文字列を取得する"""
        return self.private_key_hex

    def get_public_key_hex(self) -> str | None:
        """公開鍵のHEX文字列を取得する"""
        return self.public_key_hex

    def create_kind1_event(self, content: str) -> Event | None:
        """
        与えられたコンテンツでkind=1イベントを作成し、署名する。
        """
        if not self.private_key_hex or not self.public_key_hex:
            logging.error("kind=1 イベント作成:鍵情報が設定されていません。")
            return None
            
        private_key = PrivateKey(bytes.fromhex(self.private_key_hex))
        
        event = Event(public_key=self.public_key_hex, content=content, kind=1)
        private_key.sign_event(event)
        return event

    def publish_event_to_relay(self, event: Event):
        """
        リレーにNostrイベントを送信する。
        """
        relay_manager = RelayManager(timeout=5)
        relay_manager.add_relay(self.RELAY_URL)
        
        relay_manager.publish_event(event)
        logging.info(f"イベントをリレーに送信中... ({self.RELAY_URL})")
        time.sleep(3)
        relay_manager.close_connections()
        
        logging.info(f"投稿完了。Event ID: {event.id[:8]}...")

2. 実行ファイル (main_dht11.py) の作成

センサー固有の読み取り関数と、NostrKeyManagerを利用して温湿度データを送信するメインループを実装します。

# main_dht11.py

import logging
import time
import Adafruit_DHT # センサーライブラリはメインファイルで利用
from pynostr.bech32 import encode_pubkey
from nostr_key_manager import NostrKeyManager # ★ 作成したライブラリをインポート

# --- 設定 ---
LOG_FILE = 'dht_nostr_publisher.log' 
KEY_FILE = 'private_key.txt'
RELAY_URL = "wss://relay.damus.io" 
DHT_SENSOR_TYPE = Adafruit_DHT.DHT11
DHT_PIN_NUMBER = 4
READING_INTERVAL_SECONDS = 300 # 5分

def setup_logging():
    """ロギングシステムを設定する"""
    logging.basicConfig(
        level=logging.INFO, 
        format='%(asctime)s - %(levelname)s - %(message)s', 
        datefmt='%Y-%m-%d %H:%M:%S', 
        handlers=[
            logging.FileHandler(LOG_FILE, mode='a', encoding='utf-8'),
            logging.StreamHandler() 
        ]
    )

def read_dht11_data(dht_sensor, dht_pin) -> tuple[float | None, float | None]:
    """DHT11センサーから温度と湿度を読み取る (センサー固有のロジック)"""
    temperature, humidity = Adafruit_DHT.read_retry(dht_sensor, dht_pin)
    return temperature, humidity

if __name__ == "__main__":
    setup_logging()
    logging.info("--- Raspberry Pi DHT11 to Nostr Publisher START ---")

    # NostrKeyManagerを初期化
    key_manager = NostrKeyManager(
        key_file=KEY_FILE,
        relay_url=RELAY_URL,
    )
    
    # 鍵のセットアップとメタデータの発行を実行
    if not key_manager.setup_keys():
        logging.critical("鍵のセットアップに失敗しました。プログラムを終了します。")
        exit(1)
        
    # メタデータのチェックと発行
    key_manager.check_and_publish_metadata()
        
    logging.info("--- センサー投稿ループ開始 ---")
    
    try:
        while True:
            # 1. センサー固有のデータの読み取り
            temperature, humidity = read_dht11_data(DHT_SENSOR_TYPE, DHT_PIN_NUMBER)
            
            if temperature is not None and humidity is not None:
                logging.info(f"センサー値: 温度: {temperature:.1f}°C, 湿度: {humidity:.1f}%")

                # 2. kind=1イベントのコンテンツを準備
                pub_hex = key_manager.get_public_key_hex()
                npub = encode_pubkey(pub_hex)
                content = f"現在の気温: {temperature:.1f}°C, 湿度: {humidity:.1f}% (Raspberry Pi/DHT11より - npub: {npub})"

                # 3. イベントの作成と署名 (NostrKeyManagerの機能を利用)
                event = key_manager.create_kind1_event(content)

                # 4. イベントの公開 (NostrKeyManagerの機能を利用)
                if event:
                    key_manager.publish_event_to_relay(event)
                
            else:
                logging.warning("センサーデータの読み取りに失敗しました。スキップします。")
            
            logging.info(f"次の読み取りまで {READING_INTERVAL_SECONDS} 秒待機します...")
            time.sleep(READING_INTERVAL_SECONDS)

    except KeyboardInterrupt:
        logging.info("プログラムを終了します。")
    except Exception as e:
        logging.critical(f"メインループで予期せぬクリティカルエラーが発生しました: {e}")

TO Updated by Tomohiko Okamoto 7 days ago Actions #3

  • % Done changed from 0 to 50
Actions

Also available in: PDF Atom