ROS2を使ってみる(2)

ロボット

ここからはhttps://qiita.com/nasu_onigiri/items/dc10c3b0bfcd4754357fを試してみました。

  • 人感センサ… HC-SR501
  • サーボモータ*2つ… MG90S
  • サーボドライバ… PCA9685

によるシンプルな構成です。

ROS2パッケージの作成

mkdir -p ~/ros2_1/src

でディレクトリーを作成し、cdコマンドでros2_1に移動します。

colcon build

ファイルがいっぱいできます。

次にソースコードのためのパッケージを作ります。

cd ~/ros2__1/src
ros2 pkg create  --build-type ament_python rpi_robot_py 

pythonでプログラムを書くのでament_pythonです。rpi_robot_pyはプロジェクト名です。

できたファイル構造です。

ros2_1
├── build
│   └── COLCON_IGNORE
├── install
│   ├── COLCON_IGNORE
│   ├── local_setup.bash
│   ├── local_setup.ps1
│   ├── local_setup.sh
│   ├── _local_setup_util_ps1.py
│   ├── _local_setup_util_sh.py
│   ├── local_setup.zsh
│   ├── setup.bash
│   ├── setup.ps1
│   ├── setup.sh
│   └── setup.zsh
├── log
│   ├── build_2022-06-04_11-11-31
│   │   ├── events.log
│   │   └── logger_all.log
│   ├── COLCON_IGNORE
│   ├── latest -> latest_build
│   └── latest_build -> build_2022-06-04_11-11-31
└── src
    └── rpi_robot_py
        ├── package.xml
        ├── resource
        │   └── rpi_robot_py
        ├── rpi_robot_py
        │   └── __init__.py
        ├── setup.cfg
        ├── setup.py
        └── test
            ├── test_copyright.py
            ├── test_flake8.py
            └── test_pep257.py

11 directories, 23 files

src/rpi_robot_py/package.xmlを編集します。nameとかmaintenarとかを修正します。

シンプルなロボットプログラム

HC-SR501の接続

あちこちに乗っています。例えばここ

入力ノードの作成

ここでは人感センサー(HC-SR501)からの入力を書きます。
場所はsrc/rpi_robot_piの下です。ファイル名はhc_sr501.pyとしました。


import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool

import pigpio
import time

SENSOR_GPIO_PIN = 17
TIMER_INTERVAL = 0.5

class HumanSensor(Node):
      def __init__(self):
        super().__init__('human_sensor_node')
        self.init_sensor()
        self.pub_sensor = self.create_publisher(Bool, '/input/human_sensor', 10)
        self.timer = self.create_timer(TIMER_INTERVAL, self.sensor_timer_callback)
        self.prev_sensor_data = False

    def sensor_timer_callback(self):
        sensor_msg = Bool(data=self.get_sensor_data())
        self.pub_sensor.publish(sensor_msg)

    def init_sensor(self):
        self.pi = pigpio.pi()
        self.pi.set_mode(SENSOR_GPIO_PIN, pigpio.INPUT)
        self.pi.set_pull_up_down(SENSOR_GPIO_PIN, pigpio.PUD_UP)

    def get_sensor_data(self):
        if self.pi.read(SENSOR_GPIO_PIN) == 1:
            return True
        else:
            return False

def main(args=None):
    rclpy.init(args=args)
    node = HumanSensor()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
main()

これを単体で動かします。
もうひとつターミナルを立ち上げ、

ros2 topic list

と入力すると、
/input/human_sensor (上のプログラムで定義した)
/parameter_events
/rosout
の3つがリストされます。
トピックとはROS2内でメッセージのやりとりの場ですから、/input/human_sensorの中を見てみましょう。

ros2 topic echo /input/human_sensor

と入力すると、

と次々に表示されます。trueのときはセンサーの前で手を振った時です。

これでセンサーが正しく動いていることと、メッセージ内容を確認できます。
ROS2 便利!

PCA9685とサーボモーターMG90Sの接続

モーターを接続するため、別の電源を用意します。(メンドクサ)
モータードライブはI2C接続で16チャンネルをもつ、PCA9685です。
制御線のところを拡大すると、

  • GND:電源のマイナスとラズパイのGND共通接続
  • OE:未接続(Hiで全ての動作を停止)
  • SCL:GPIO3番(SCL)ピンへ接続
  • SDA:GPIO2番(SDA)ピンへ接続
  • VCC:ラズパイから電源(3.3V)
  • V+:サーボモーター用電源

I2Cで動作するので、I2CライブラリーとPCA9685のドライバー(Adafruits謹製)をインストールします。

sudo apt install i2c-toolspip3 install adafruit-pca9685

i2Cが動いているか確認します。

ls -l /dev/i2c-1

このように存在していればOK

   crw-rw—- 1 root i2c 89, 1 May 31 21:32 /dev/i2c-1

ハードウェアを繋いだら以下のコマンドでI2Cの状況を見てみましょう。
なお、私のもっているサーボモーターMG90Sはオレンジ色の線がPWMでした。接続を間違えると動きませんからね。

sudo i2cdetect -y 1

こんな感じ

動作確認

こちらのサイトのお世話になります。(このプログラムはROS2と関係なく、動作テストプログラムです。私はDocumentsに保管してます)

# import RPi.GPIO as GPIO
import Adafruit_PCA9685
from time import sleep

# 設定周波数(Hz)
set_freq = 50

# 動作角度からPCA9685に渡す値を変換
def convert_deg(deg,freq):

	# 分解能(ステップ数)
	step = 4096

	# 接続サーボモーターの最大最小角度時のパルス間隔(ms)
	# この数値はSG90の仕様に基づくものですが、必要に応じて調整してください。
	max_pulse = 2.4 # 90°時
	min_pulse = 0.5 # -90°時

	# サーボモーターの0°時のパルス間隔(ms)
	center_pulse = (max_pulse - min_pulse) / 2 + min_pulse

	#サーボモーター1°あたりのパルス間隔(ms)
	one_pulse = round((max_pulse - min_pulse) / 180, 2)

	# 要求角度のパルス間隔(ms)を算出しPCA9685に渡す値を算出
	deg_pulse = center_pulse + deg * one_pulse
	deg_num = int(deg_pulse / (1.0 / freq * 1000 / step))

	# デバッグ
	print('deg:' + str(deg) + '(' + str(deg_num) + ')')

	return deg_num

pwm = Adafruit_PCA9685.PCA9685()
pwm.set_pwm_freq(set_freq)

try:
	while True:
		pwm.set_pwm(0, 0, convert_deg(0,set_freq))
		sleep(1)
		pwm.set_pwm(0, 0, convert_deg(45,set_freq))
		sleep(1)
		pwm.set_pwm(0, 0, convert_deg(0,set_freq))
		sleep(1)
		pwm.set_pwm(0, 0, convert_deg(-45,set_freq))
		sleep(1)

except KeyboardInterrupt:
	pwm.set_pwm(0, 0, 0)
	print("KeyboardInterrupt")
	pass
 

テストプログラムを動かすと、/etc/i2c-1へpermission deniedをくらいました。
調べると /etc/group にユーザーを追加し、権限をもつ必要があるようです。

sudo nano /etc/group

i2cの項目へ、他の項目のまねをして最後にubuntuを追加して、リブートしましょう。

うまく動作しました。

出力ノードの作成

サーボモーターのノード用プログラムを動かしてみます。
プログラムの注釈にあるゆにサーボモーターをPCA9685のチャネル0, 3に接続します。


import rclpy
from rclpy.node import Node
from rclpy.executors import SingleThreadedExecutor
# from rclpy.executors import MultiThreadedExecutor
from std_msgs.msg import Int8
import Adafruit_PCA9685

ANGLE_MIN = -70
ANGLE_MAX = 70

# PCA9685 の 0 と 3 に、2つのサーボを接続
SERVO_LEFT_ID = 0
SERVO_RIGHT_ID = 3

class Servo(Node):
    def __init__(self, servo_name, servo_id):
        super().__init__('servo_node_' + servo_name)
        self.init_pca9685()
        self.servo_name = servo_name
        self.servo_id = servo_id
        self.sub_topic_name = '/output/servo/' + servo_name
        self.sub_servo = self.create_subscription(Int8, self.sub_topic_name, self.servo_callback, 10)

    def init_pca9685(self):
        self.pwm = Adafruit_PCA9685.PCA9685(address=0x40)
        self.pwm.set_pwm_freq(60)

    def servo_callback(self, servo_msg):
        self.get_logger().info('subscribe servo angle: {}'.format(servo_msg.data))
        self.set_angle(servo_msg.data)

    def set_angle(self, angle):
        angle = max(ANGLE_MIN, angle)
        angle = min(ANGLE_MAX, angle)
        pulse = (600-150) / 180 * (angle + 90) + 150
        self.pwm.set_pwm(self.servo_id, 0, int(pulse))

def main(args=None):
    rclpy.init(args=args)
    executor = SingleThreadedExecutor()
    ## mutli thread の場合はこっち
    # executor = MultiThreadedExecutor(num_threads=2)
    ## ノード(クラス)を2つインスタンス化し、executor に登録
    node_left = Servo(servo_name='left', servo_id=SERVO_LEFT_ID)
    node_right = Servo(servo_name='right', servo_id=SERVO_RIGHT_ID)
    executor.add_node(node_left)
    executor.add_node(node_right)
    try:
        executor.spin()
    except KeyboardInterrupt:
        pass
    executor.shutdown()
    node_left.destroy_node()
    node_right.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
main()

ExecutorというものがNodeの実行管理をします。ここではnode_left, node_rightを一組としてExecutorに登録します。
Executor.spinで動き出します。Executor.shutdownで実行終了。

長くなったので次に続く。。。