ITで遊ぶ

PyUSB 私家版日本語訳(3)

USBを介して周辺装置と会話する

以下、USB接続、通信方法の基本をこちらあたりで学習されてから読まれることを強くお勧めします。

Pythonは3をお勧めします。(っていうか、2009年に発表なんだから、いい加減にpython2は終わってほしい。あちこちに残ってて邪魔)

とはいえ、pyusbは古くネットにもいいサンプルがありませんし、面倒くさいようです。
よほどの理由がないと使うことはお勧めしません。

 

pyUSBとは

もちろん、
PyUSBはPythonで書かれたUSBドライバーとコミュニケーションするパッケージです。
サポートされるドライバーはlibusb 0.1, libusb 1.0とOpen USBです。

 

パッケージの中には以下のモジュールが含まれます。

モジュール 役割
core メインのUSBのモジュール
util ユーティリティ機能
control コントロール
legacy 0.x時代の互換性のため
backend 組み込みバックエンドのはいったサブパッケージ

サンプル

サンプルコードは以下のとおり

import usb.core
import usb.util

# find our device
dev = usb.core.find(idVendor=0xfffe, idProduct=0x0001)

# was it found?
if dev is None:
    raise ValueError('Device not found')

# set the active configuration. With no arguments, the first
# configuration will be the active one
dev.set_configuration()

# get an endpoint instance
cfg = dev.get_active_configuration()
intf = cfg[(0,0)]

ep = usb.util.find_descriptor(
    intf,
    # match the first OUT endpoint
    custom_match = \
    lambda e: \
        usb.util.endpoint_direction(e.bEndpointAddress) == \
        usb.util.ENDPOINT_OUT)

assert ep is not None

# write the data
ep.write('test')

解説

最初の二行

import usb.core
import usb.util

PyUSBパッケージを持ち込む命令で、usb.coreはメインであり、usb.utilはユーティリティ機能を担うものです。

# find our device
dev = usb.core.find(idVendor=0xfffe, idProduct=0x0001)

目的のデバイスを探し、見つけたらオブジェクトのインスタンスを作る命令です。
もし、見つからないとNoneが帰ってきます。

# set the active configuration. With no arguments, the first
# configuration will be the active one
dev.set_configuration()

使うためのコンフィギュレーションをセットします。ここで引数がありませんが、PyUSBは多数あるコンフィギュレーションについて最も平均的なデバイス用に省略値をもっています。ここでは特にfindでみつけたコンフィグレーションがセットされます。

# get an endpoint instance
cfg = dev.get_active_configuration()
intf = cfg[(0,0)]

ep = usb.util.find_descriptor(
    intf,
    # match the first OUT endpoint
    custom_match = \
    lambda e: \
        usb.util.endpoint_direction(e.bEndpointAddress) == \
        usb.util.ENDPOINT_OUT)

assert ep is not None

操作したいエンドポイントを探し、最初のインターフェースの中を調べます。エンドポイントをみつけた後、データをおくるのです。、エンドポイントとはデバイス側がもつ通信のためのFIFOバッファのことをいいます。

実はエンドポイントはUSBデバイスをホストに接続した途端に0はできます。
このエンドポイントを利用して、コンフィグレーションのやり取りをする必要があるからです。
これを「エンドポイント0」とか「FIFO0」とか言います。

エンドポイントは0以外は双方向通信できません。1-15 の番号と方向INかOUTを定義せねばなりません。
(USB3.0はIN/OUTの指定もできるようです。)

assertはPythonの命令で予期した値でなければ、例外を発生します。

もし、すでにエンドポイントのアドレスが決まっているならば、このようにwriteすればいいです。

dev.write(1,'test')

この例ではエンドポイントは1です。

エラーハンドリング

PyUSBはどの機能についてもエラーならばエクセプションをあげます。エラーはusb.core.USBErrorにあります。

したがってエラーハンドリングコードは次のようになります。

# was it found?
if dev is None:
    raise ValueError('Device not found')

また、PyUSBはログ機能もあります。loggingモジュールを使うためには、環境変数PYUSB_DEBUGをセットしてください。critical, error, warning, info, debugがあります。
省略値のメッセージはsys.stでっろrに送られます。ログメッセージをファイルにリダイレクトしたい場合は環境変数PYUSB_LOG_FILENAMEにファイル名を設定してください。

目的のUSBデバイスの見つけ方

coreモジュールにあるfind()関数は接続されているデバイスを列挙します。たとえば、Vender IDが0xfffeでProduct IDが0x0001ならば、こういうコードになります。

import usb.core

dev = usb.core.find(idVendor=0xfffe, idProduct=0x0001)
if dev is None:
    raise ValueError('Our device is not connected')

usb.core.Deviceオブジェクトが、そのデバイスを意味することになります。失敗するとNoneが入ります。
デバイスの詳細情報はDescriptorにあります。例えば、USBが接続されているかを調べるには次のようになります。

# actually this is not the whole history, keep reading
if usb.core.find(bDeviceClass=7) is None:
    raise ValueError('No printer found')

7はUSBの仕様でプリンタークラスを示すコードです。
逆にプリンターだけをリストすることもできます。

# this is not the whole history yet...
printers = usb.core.find(find_all=True, bDeviceClass=7)

# Python 2, Python 3, to be or not to be
import sys
sys.stdout.write('There are ' + len(printers) + ' in the system\n.')

find関数のfind_allのパラメーターは省略値はFalseです。Falseの場合、findeは指定した(後述)条件に適合する最初のデバイスを戻してきます。Trueの場合はマッチしたデバイスをリストの形で戻します。

他にもfind関数で独自の条件でデバイスを探すことができます。同じプリンターを探すルーチンです。
find関数でcustom_matchにクラスを渡し、クラス内のbDeviceClassでプリンターをみつけているところに注意してください。

import usb.core
import usb.util
import sys

class find_class(object):
    def __init__(self, class_):
        self._class = class_
    def __call__(self, device):
        # first, let's check the device
        if device.bDeviceClass == self._class:
            return True
        # ok, transverse all devices to find an
        # interface that matches our class
        for cfg in device:
            # find_descriptor: what's it?
            intf = usb.util.find_descriptor(
                                        cfg,
                                        bInterfaceClass=self._class
                                )
            if intf is not None:
                return True

        return False

printers = usb.core.find(find_all=1, custom_match=find_class(7))

oxfffeベンダーのプリンターを見つけるには次のように書けばいいことになります。

# find all printers that belongs to our vendor:
printers = usb.core.find(find_all=1, custom_match=find_class(7), idVendor=0xfffe)

USB装置を見つける単体プログラムとしては、これ。
これでつながっているUSBデバイスがリストされます。

#!/usr/bin/python
import sys
import usb.core
# find USB devices
dev = usb.core.find(find_all=True)
# loop through devices, printing vendor and product ids in decimal and hex
for cfg in dev:
  sys.stdout.write('Decimal VendorID=' + str(cfg.idVendor) + ' & ProductID=' + str(cfg.idProduct) + '\n')
  sys.stdout.write('Hexadecimal VendorID=' + hex(cfg.idVendor) + ' & ProductID=' + hex(cfg.idProduct) + '\n\n')

デスクリプター(詳細情報)

さて、デバイスを見つけたら、コンフィグレーション、インターフェース、エンドポイント、転送方式などをしりたくなります。

対話式には次のように行うとフィールド値がわかります。

>>> dev.bLength
>>> dev.bNumConfigurations
>>> dev.bDeviceClass
>>> # ...

個々のデバイスのコンフィグレーションを知るためには次のようなループとなります。

for cfg in dev:
    sys.stdout.write(str(cfg.bConfigurationValue) + '\n')

同様にしてコンフィグレーションからインターフェースを知り、インターフェースからエンドポイントがわかります。
こんな感じ。

for cfg in dev:
    sys.stdout.write(str(cfg.bConfigurationValue) + '\n')
    for intf in cfg:
        sys.stdout.write('\t' + \
                         str(intf.bInterfaceNumber) + \
                         ',' + \
                         str(intf.bAlternateSetting) + \
                         '\n')
        for ep in intf:
            sys.stdout.write('\t\t' + \
                             str(ep.bEndpointAddress) + \
                             '\n')

順々に見ていかなくても、値を指定することでデスクリプターを入手することもできます。

>>> # access the second configuration
>>> cfg = dev[1]
>>> # access the first interface
>>> intf = cfg[(0,0)]
>>> # third endpoint
>>> ep = intf[2]

ご存知のようにインデックスは0から始まります。しかし、すでにインターフェースが他のものと接続されているかもしれません。そういう場合、二番目の代替アドレスをcfg[(0,1)]というようにセットできます。

ここまでくると、find_descriptorユーティリティ関数を使えます。
すでにプリンターの例を見ましたが、find_desciptorはfindと似た機能ですが、次の二点が違います。

  • find_descriptorは見つけた最初のデスクリプターを返します。
  • backendパラメタがない

たとえば、コンフィグレーションデスクリプターcfgがすでにあり、インターフェース1以外のセッティングすべてを見つけたいならば、

import usb.util
alt = usb.util.find_descriptor(cfg, find_all=True, bInterfaceNumber=1)

usb.utilモジュール内のfind_descriptorは先に書いたcustom_matchパラメーターと似たことができます。

複数のデバイスを扱う場合、、、(省略)

注意

USBデバイスは仕様で、追加の設定なしでSET_INTERFACE要求を受け取るとエラーを返します。したがってひとつ以上の別の設定のないインターフェースを設定する場合、try-exceptブロックにしておくことです。

try:
    dev.set_interface_altsetting(...)
except USBError:
    pass

また、関数のパラメータとしてIntrerfaceオブジェクトを使うと、interfaceとalternate_settingパラメータは自動的にbInterfaceNumberとbAlternateSettingフィールドから推測されます。

>>> intf = find_descriptor(...)
>>> dev.set_interface_altsetting(intf)
>>> intf.set_altsetting() # wow! Interface also has a method for it

Interfaceオブジェクトは現在アクティブなコンフィグレーションデスクリプターに属していなければなりません。

転送方式

ここまで来るとデータの転送ができます。転送方式はコントロール転送、バルク転送、インタラプト転送、アイソクロナス転送があります。

コントロール転送は仕様に基づく構造化されたデータを転送する唯一の方法です。他の転送方式はデータを送受信するだけの方法です。

コントロール転送にはctrl_transferメソッドを使います。INでもOUTでも使います。方向はbmRequestTypeパラメーターで決めます。

ctrl_transferパラメーターはコントロール要求の構造とほぼおなじパラメーター群です。以下は例です。

>>> msg = 'test'
>>> assert dev.ctrl_transfer(0x40, CTRL_LOOPBACK_WRITE, 0, 0, msg) == len(msg)
>>> ret = dev.ctrl_transfer(0xC0, CTRL_LOOPBACK_READ, 0, 0, len(msg))
>>> sret = ''.join([chr(x) for x in ret])
>>> assert sret == msg

この例ではふたつのカスタムコントロールがループバックのパイプとして定義されています。CTRL_LOOPBACK_WRITEでメッセージをかきだすと、CTRL_LOOPBACK_READでメッセージを読めます。

最初の4つのパラメーターはbmRequestType, bmRequest, wValue, wIndexです。
これらはコントロール転送で必要な標準の構造です。5番目のパラメーターはOUTで送出するデータ自身か、INでよみこむバイト数です。

送出するデータはどんな__init__メソッドで示された配列で使えるタイプならどれでも可能です。
もし、送出データがないならばNone(もしくは、INの場合は0)を設定します。
最後にタイムアウト時を設定できます。設定しなければ、デフォルトのタイムアウト値になります。
OUT転送において、戻る値は送出したデータのバイト数です。IN転送では読んだデータの配列オブジェクトが戻ります。

ほかのタイプの転送ではwriteとreadが使えます。read,writeの場合は転送タイプをきにすることはなく、エンドポイントのアドレスから自動的に転送モードは決まります。
このサンプルはエンドポイント11のパイプのループバックを使った例です。

>>> msg = 'test'
>>> assert len(dev.write(1, msg, 100)) == len(msg)
>>> ret = dev.read(0x81, len(msg), 100)
>>> sret = ''.join([chr(x) for x in ret])
>>> assert sret == msg

最初と三番目のパラメータはどちらのメソッドでも同じで、エンドポイントのアドレスとタイムアウト値です。二番目のパラメタはwriteの場合はデータでreadの場合は読むデータの長さです。
戻り値はreadメソッドの場合は配列オブジェクトでwriteの場合はバイト数です。

バージョン2のベータ依頼、バイト数の変わりにreadの時にctrl_transferにデータが読み込まれる配列オブジェクトを用意して渡すことができるようになりました。この場合、読んだバイト数はarray.itmsize値の配列の数となります。

ctrl_transferではtimeoutパラメーターはオプションです。ない時はDevice.default_timeoutプロパティが使われます。

以下、再び省略。

 

実際に書くためには

あんまり面倒くさいことを知らなくてもなんとかなった。
ネットにあるサンプルをふたつ掲載しておくが、どうも古いようだ。
理由はpyusbでbulkread, bulkwriteはlegacyフォルダーにあり、古いインターフェースであるらしい。
endpointクラスのソースコードを読むとwrite, read, clear_haltしかない。

write(self, data, timeout=None)
エンドポイントにデータを送出します。データとタイムアウトを設定します。転送タイプとエンドポイントアドレスは自動的に継承されます。
戻り値は書き込んだバイト数です。(selfはpythonのクラスを参照)
read(self, size_or_buffer, timeout=None)
エンドポイントから読み込みます。バッファーサイズかバッファー配列を指定しておきます。
clear_halt(self)
エンドポイントのhalt/statusコンディションをクリアします。

サンプル

#!/usr/bin/python -i

import usb

def opendevice(idVendor, idProduct):
    devices=[]

    for b in usb.busses():
        for d in b.devices:
            if d.idVendor==idVendor and d.idProduct==idProduct:
                devices.append(d)

    if len(devices)==1:
        device=devices[0]
        return device

    elif not devices:
        raise "Device not found"
    else:
        raise "More than one device found"

if __name__=="__main__":

    device=opendevice(0x04d8, 0x0032)
    packet_len=8

    dh=device.open()

    dh.setConfiguration(2)
    dh.claimInterface(0)

    # First test, turn power off
    dh.bulkWrite(1,"V0"+(packet_len-2)*"Z",1000)
    # Second test, get version number
    dh.bulkWrite(1,"v"+(packet_len-1)*"Z",1000)
    r=dh.bulkRead(1,3)
    print r

サンプル2

import usb

class UsbDevice:
    def __init__(self, idVendor, idProduct):
        busses = usb.busses()
        for bus in busses:
            devices = bus.devices
            for device in devices:
                #print hex(device.idVendor), hex(device.idProduct)
                if (device.idVendor, device.idProduct) == (idVendor, idProduct):
                    self.device = device
                    self.configuration = self.device.configurations[0]
                    self.interface = self.configuration.interfaces[0][0]
                    self.endpoints = []
                    self.pipes = []
                    for endpoint in self.interface.endpoints:
                        self.endpoints.append(endpoint)
                        self.pipes.append(endpoint.address)
                    return
        raise RuntimeError, 'Device not found'

    def open(self):
        if hasattr(self, 'handle'):
            raise RuntimeError, 'Device already opened'
        self.handle = self.device.open()
        self.handle.setConfiguration(self.configuration)
        self.handle.claimInterface(self.interface)
        self.handle.setAltInterface(self.interface)

    def close(self):
        if hasattr(self, 'handle'):
            self.handle.releaseInterface()
            del self.handle
        else:
            raise RuntimeError, 'Device not opened'

    def bulkWrite(self, pipeno, buffer, timeout=100):
        if hasattr(self, 'handle'):
            self.handle.bulkWrite(self.pipes[pipeno], buffer, timeout)
        else:
            raise RuntimeError, 'Device not opened'

    def bulkRead(self, pipeno, size, timeout=100):
        if hasattr(self, 'handle'):
            data = self.handle.bulkRead(self.pipes[pipeno], size, timeout)
            return map(lambda x: 0xFF & x, data)
        else:
            raise RuntimeError, 'Device not opened'

if __name__=='__main__':
    device = UsbDevice(0x0547, 0x1002) # EZ-USB FX2
    device.open()
    #print device.pipes
    device.bulkWrite(0, [0])
    print device.bulkRead(2, 12, 1000)
    device.close()

 

サンプル3(SCPIで測定器と会話する。どこにもサンプルがなかった。俺すごい)

SCPIは測定器を操作するための共通のコマンドです。かなりの装置がコントロールできます。

このサンプルを動かすための手順(以下全部、環境はWindowsです)

1.準備
USBドライバーとしてlibusb-win32をインストールします。
私はOWONのAG-015Fで試しましたが、添付のwaveform generatorをインストールすることで同時にインストールできました。
(コードサイニングされていないため、デバイスドライバーは手動でインストールしなおす必要があります。つまり一旦インストールして、装置をUSBでつないでからデバイスマネージャを見ます。エラーになっているのでドライバーの更新でwaveform generator以下のUSBDRVフォルダーを指定し、Installを選びます。)
waveform generatorにはSCPIコマンドを試せる機能がありますから、テストして接続しているか確認できます。

Pythonの準備をします。
Pythonからpyusbパッケージをインストールしてください。
コマンド例:
>>>pip install pyusb

2.設定
下のプログラムの中に
dev = usb.core.find(idVendor=0xxxxx, idProduct=0xxxxx)
があります。
ここで目的の装置のidVendorとidProductを見つける必要があります。

上に記載している「USB装置を見つける単体プログラム」を実行し、idVendorとidProductを見つけます。
実行するとパソコンに接続されているUSB装置のidvenderとidProductがリストされます。
複数接続されていて、どれが目的の装置かわからない場合は一度目的の装置をはずしてfindusb.pyを実行し、接続した時に実行したリストとくらべてください。

3.実行
以下のサンプルの中のidVendorとidProductを書き換えます。
それだけで実行可能です。
SCPIコマンド、*IDN? の答えが目的の装置から戻ります。

import usb.core
import usb.util
import sys

dev = usb.core.find(idVendor=0xxxxx, idProduct=0xxxxx)

if dev is None:
	raise ValueError('Device not found')

dev.set_configuration()

cfg = dev.get_active_configuration()

itfs = cfg[(0,0)]

ep_out = usb.util.find_descriptor(
	itfs,
	custom_match = \
    lambda e: \
        usb.util.endpoint_direction(e.bEndpointAddress) == \
        usb.util.ENDPOINT_OUT)

ep_in = usb.util.find_descriptor(
	itfs,
	custom_match = \
	lambda e: \
        usb.util.endpoint_direction(e.bEndpointAddress) == \
        usb.util.ENDPOINT_IN)


ep_out.write("*IDN?")
ret = ep_in.read(64)
chars = " ".join(map(chr,ret))

print(chars)

(追記2021/11/20) 久しぶりにインストールから行うと、Windows版Python 3.10はひどい。
間違っているかもしれないが、pipもeasysetupも入っていなかった。
3.9では手持ちのPyUSBが動かなかった。Githubでエラーが報告されていた。
PyUSBの最新版をGithubから入手するか、Pythonを3.7くらいで動かしたほうがいいようだ。

関連記事

  1. ポケットオシロスコープ DSO Touchの説明書を翻訳したよ

  2. 小学生にプログラミング?言ってるおまえ、プログラム書けないだろ

  3. 3Dプリンター印刷品質問題の解決方法

  4. Arduinoに複数のシリアル通信をさせる

  5. DOAの落とし穴

  6. やっぱりドットインストールは勉強にいい

  7. XMLhttpRequest -> fetch

  8. Googleの検索結果をスプレッドシートに抜く方法

記事をプリント