5.1.2 Pythonカスタムコンポーネント作成
本項では、Pythonによるカスタムコンポーネントの作成方法について説明します。
5.1.2.1 前提知識#
カスタムコンポーネントを開発する上での前提知識として、Python言語のチュートリアルの内容は大方把握できていること、併せてクラス定義、メソッド定義、クラスの継承等について理解していることを想定しています。 入門レベルの情報については、下記の Python公式ドキュメントを参照してください。
Python公式ドキュメント: https://docs.python.org/ja/3/
5.1.2.2 サンプルカスタムコンポーネント#
Pythonによるカスタムコンポーネントの実装方法説明のため、シンプルな仕様のカスタムコンポーネントをサンプルとして提供しています。
もしくは、SpeeDBee Synapseをインストール済みの場合、カスタムコンポーネントサンプルも下記の場所にインストールされています。
$ ls /usr/local/speedbeesynapse/share/sample_component_py/
sin_and_random.py read_and_log.py
5.1.2.2.1 サンプルの内容#
カスタムコンポーネントのサンプルとして本章では下記のサンプルを説明します。
-
sin_and_random.py
出力ポートに対して以下の2つのカラムを作成し、データを出力し続けるコンポーネントです。
カラム名 データ型 内容 sin DOUBLE 1HzのSIN波の値を秒間10件登録します。SIN波の周期はパラメータで変更可能です rand INT32 1秒に1回、[0, 100)の範囲内のいずれかの値をランダムに登録します -
read_and_log.py
入力ポートからデータを受け取り、それを文字列型カラムとログに出力します。
5.1.2.2.2 実行#
*.py
ファイルをそのまま画面から登録することで、エラー等がなければコンポーネントベースが追加された状態になります。登録方法は「カスタムコンポーネントを使用する」を参照してください。
登録後は、通常のコンポーネントと同様にD&Dでコンポーネントインスタンスを生成できます。
sin_and_random
およびread_and_log
を繋いで実行することで、それぞれの出力したカラムを表示できます。
下記はsin_and_random
の出力ポートをモニタリングしています。
5.1.2.3 カスタムコンポーネント実装詳細#
以降では、サンプルのsin_and_random
、および、read_and_log
のソースコードを例に、カスタムコンポーネントの実装内容を説明していきます。
5.1.2.3.1 カスタムコンポーネントクラス定義#
Pythonによるカスタムコンポーネントは、下記の通りHiveComponentBase
クラスを継承した新しいクラスを定義することでそのコンポーネントベースの名前や識別情報、実行時の処理を定義することができます。
from speedbeesynapse.component.base import HiveComponentBase, HiveComponentInfo, DataType
:
@HiveComponentInfo(uuid='00000000-0000-0000-0000-000000000000', name='コンポーネント名', inports=0, outports=1)
class HiveComponent(HiveComponentBase):
def main(self, param):
:
クラス名は必ずHiveComponent
としてください。
また、コンポーネントの情報を@HiveComponentInfo
デコレータでラップする必要があります。
ここに定義した情報により、画面でもコンポーネントベースの情報を扱うことができるようになります。
下記の説明を参考に設定してください。
メンバ名 | 定義情報 | 説明 |
---|---|---|
uuid |
UUID | コンポーネントを識別するためのUUIDです。ランダムなIDを生成してここに設定してください。 |
name |
コンポーネント名 | コンポーネントの名前を設定します。画面ではこの名前が表示されますのでなるべく他のコンポーネントと被らず、分かりやすい名前にしてください。 |
tag |
タグ | コンポーネントへタグを付与します。 "collector","emitter","serializer","action","logic"のいずれかの指定によりコンポーネントベースの配置先を指定できます。省略した場合は、Customへ配置されます。 |
outports |
出力ポート数 | このコンポーネントが持つ出力ポートの数を設定します。他のコンポーネントの入力ポートにデータを渡す場合はこれを1としてください。 |
inports |
入力ポート数 | このコンポーネントが持つ入力ポートの数を設定します。他のコンポーネントの出力ポートからデータを受け取る場合は、これを1としてください。 |
UUIDについては必ずランダムなIDを生成してください。 他のコンポーネントと同じUUIDにしてしまうと、どちらのコンポーネントも使えなくなることがあります。
5.1.2.3.2 HiveComponent
クラスのライフサイクル#
ここで定義したHiveComponent
クラスが、1つのコンポーネントベースとなります。
Pythonスクリプトファイル内で、このクラスをインスタンス化するような処理は必要ありません。
かわりに、画面上でコンポーネントベースをD&Dしてコンポーネントインスタンスを生成すると、自動でここで定義したHiveComponent
クラスのインスタンスが生成されます。
また、画面上でコンポーネントインスタンスを削除すると、このクラスのインスタンスが破棄されます。
そのため、HiveComponent
クラス内に__init__()
メソッドや__del__()
メソッドを定義しておけば、画面上でのコンポーネントインスタンスの生成・削除に合わせてそのメソッドが実行されることになります。
@HiveComponentInfo(uuid='00000000-0000-0000-0000-000000000000', name='サンプルコンポ', inports=0, outports=0)
class HiveComponent(HiveComponentBase):
def __init__(self):
# called at creating component instance
pass
def __del__(self):
# called at deleting component instance
pass
def premain(self, param):
# called before main()
pass
def postmain(self, param):
# called after main() finished
pass
def main(self, param):
# called at starting component instance
while self.is_runnable():
pass
def notify_stop(self):
# called before the component received stop request from system
pass
他にも、画面からのコンポーネントの操作により、コールされるメソッドがあります。 コールされるタイミングとそのメソッドについては下図、下表を参照してください。
コールバック関数 | 説明 |
---|---|
__init__ |
コンポーネントのインスタンスが生成されたときにコールされます |
__del__ |
コンポーネントのインスタンスが破棄されるときにコールされます |
premain |
コンポーネントを開始するときにmainの前にコールされます |
main |
コンポーネントのメインの処理を実装する関数です コンポーネントの終了要求を受けるまで、この関数は終了せずにループ処理し続ける必要があります |
postmain |
コンポーネントの終了時、mainの終了直後にコールされます |
notify_stop |
mainの実行中、コンポーネントの停止要求を受けたときに実行されます このメソッドは main() の実行中に呼ばれるため、別スレッドで実行されることに注意してください。 |
main
は、コンポーネントの処理の本体になりますので、必ず定義する必要があります。
以降ではサンプルプログラムのmain
関数の実装方法を説明します。
5.1.2.3.3 出力ポートのカラム作成/データ登録(sin_and_random
)#
下記はカスタムコンポーネントのサンプルsin_and_random
のメインの処理です。
これをもとにデータを出力する流れを説明します。
def main(self, param):
:
count = 0
# Create columns.
self.sin = self.out_port1.Column('sin', DataType.DOUBLE)
self.rand = self.out_port1.Column('rand', DataType.INT32)
# Repeat at 100msec intervals.
for [ts, skip] in self.interval_iteration(100000000):
# Compute sin wave value and insert it into 'sin' column.
val = math.sin(2*math.pi*sin_hz*count/10)
self.sin.insert(val, ts)
# Insert random value into 'rand' column only once in 10.
if count % 10 == 0:
self.rand.insert(random.randrange(0, 100), ts)
count += 1
-
カラムの作成
初めに、カラムを作成します。
self.sin = self.out_port1.Column('sin', DataType.DOUBLE) self.rand = self.out_port1.Column('rand', DataType.INT32)
このコンポーネントでは倍精度浮動小数点数のカラム
sin
と、32bit整数のカラムrand
を利用しますので、self.out_port1
にあるColumn()
メソッドにてカラム名、データ型等を指定して、カラムオブジェクトを生成しています。 戻り値は後にカラムにデータを登録する際に使用しますので、変数に格納しておきます。コンポーネントが出力するすべてのデータは、このように生成したカラムに対して登録していく必要があります。
-
定期的な処理の繰り返し
定期的にデータを登録するため、for文で繰り返し処理を実装しています。
for [ts, skip] in self.interval_iteration(100000000): :
ループの判定条件で呼び出している
self.interval_iteration()
は、引数に指定した時間間隔で繰り返し値を返すジェネレータになっています。 それにより、このfor文の内部は一定間隔で実行されることになります。引数に指定するのはナノ秒単位になりますので、この例の場合は0.1秒(100000000ナノ秒)に1回、ループ処理が実行されることになります。
画面からコンポーネントを停止しない限り、このfor文のループは終わることはありません。
-
カラムへのデータ登録
繰り返し処理の中で、カラムに対してデータを登録します。
val = math.sin(2*math.pi*sin_hz*count/10) self.sin.insert(val, ts) if count % 10 == 0: self.rand.insert(random.randrange(0, 100), ts)
データの登録は、事前に作成したカラムオブジェクトの
insert()
メソッドの呼び出しで行なえます。 第一引数には登録する値を、第二引数にはその値を登録する時刻を指定してください。第二引数は省略すれば現在時刻として扱われます。
5.1.2.3.4 入力ポートからのデータ読み込み(read_and_log
)#
下記はカスタムコンポーネントのサンプルread_and_log
のメインの処理です。
これをもとに別のコンポーネントが出力したデータを受け取る処理の流れを説明します。
def main(self, param):
# Create a column for logging received data.
self.logclm = self.out_port1.Column('log', DataType.STRING)
# Logging column information received from 'in_port1'.
columns = self.in_port1.get_columns()
for column in columns:
self.log.info(column)
# Start receving column values from 'in_port1'.
with self.in_port1.ContinuousReader(start=self.get_timestamp()) as reader:
# Loop until the component received 'stop' request.
while self.is_runnable():
# Get a group of records of column data (called as window_data).
window_data = reader.read()
if not window_data:
# window_data may be None.
self.log.info("no data yet")
continue
# Process all records in a window_data one by one.
for record in window_data.records:
# Convert timestamp as a string.
ts = datetime.datetime.fromtimestamp(record.timestamp / 1000000000)
# Process all column's value in a record one by one.
for columnvalue in record.data:
if columnvalue.value:
# Make string from columnvalue, logging it and insert it into the column
logmsg = f"{columnvalue.column}[{str(ts)}]: {columnvalue.value}"
self.log.info(logmsg)
self.logclm.insert(logmsg)
else:
pass # ignore None
先頭で作成しているlog
カラムは、文字列として情報を出力するためのものです。
別コンポーネントからデータを受けるだけなら必須ではないため割愛します。
-
入力ポートのカラム情報の取得
in_port1
にあるget_columns()
メソッドにより、そのコンポーネントの入力ポートに接続されたコンポーネントが出力しているデータのカラム情報を取得できます。columns = self.in_port1.get_columns() for column in columns: self.log.info(column)
上記の例では取得したカラムをイテレーションループでログに出力しています。
コンポーネントへの接続を変えたり、接続されたコンポーネントを停止したりすると、この
get_columns()
で取得できるカラム情報が変化します。 常に最新の情報が必要な場合には定期的に取得して内容をチェックするようにしてください。 -
ContinuousReader
の開始とループ入力ポートから継続的にデータを取得し続ける場合、
ContinuousReader
オブジェクトを利用します。 下記のようにwith句で使用し、さらに繰り返しread()
をコールすることで入力ポートに届いたデータを取得することができます。with self.in_port1.ContinuousReader(start=self.get_timestamp()) as reader: while self.is_runnable(): window_data = reader.read() if not window_data: self.log.info("no data yet") continue :
read()
の戻り値はNoneになることもあるのでその場合は読み飛ばしてください -
取得したデータのレコードを参照
ContinuousReader
のread()
メソッドは、HiveWindowData
オブジェクトを返します。 このオブジェクトは、入力ポートに入ってきたデータをある時間幅分まとめて保持したもので、下図のように同一時刻のデータを収めた複数のレコードからなり、1つのレコードに複数のカラムのデータが収められています。そのため、下記のように二重のforループ文によりすべてのデータにアクセスすることができます。
for record in window_data.records: ts = datetime.datetime.fromtimestamp(record.timestamp / 1000000000) for columnvalue in record.data: if columnvalue.value: logmsg = f"{columnvalue.column}[{str(ts)}]: {columnvalue.value}" self.log.info(logmsg) self.logclm.insert(logmsg) else: pass # ignore None
各データの詳細は
HiveRecordData
オブジェクトを参照してください。
5.1.2.3.5 ログ出力#
カスタムコンポーネントでは、下記のようにself.log
のメソッドを利用することで任意のタイミングでログを出力することができます。
詳細はHiveLog
を参照してください。
self.log.info('create columns')
self.log.info(f'insert value={val} into {column_name}')
self.log.error('invalid data received')
出力したログは、画面からダウンロードすることができます。「各種ログ」を参照してください。
5.1.2.3.6 コンポーネントパラメータ#
画面上からコンポーネントインスタンスを生成すると、その設定項目としてコンポーネントの実行時パラメータを設定することができます。 インスタンスごとにパラメータ設定を変えることで、同じコンポーネントベースの実装でも、コンポーネントの動作を変えることができます。
画面で指定する際、パラメータのタイプはSTRING、もしくは、JSONを選択できます。実装時に扱いやすいタイプを選択してください。
パラメータタイプ | 説明 |
---|---|
STRING | 任意の文字列を設定できます。 コンポーネントの各メソッドの第二引数では str 型として参照できます。 |
JSON | JSON文字列を設定できます。 コンポーネントの各メソッドの第二引数では dict 型として参照できます。 |
コンポーネントインスタンスに設定したパラメータは、定義したHiveComponent
クラスのpremain()
,main()
,postmain()
メソッドの第二引数として取得することができます。
@HiveComponentInfo(uuid='00000000-0000-0000-0000-000000000000', name='サンプルコンポ', inports=0, outports=0)
class HiveComponent(HiveComponentBase):
def premain(self, param):
// use 'param' here.
pass
def postmain(self, param):
// use 'param' here.
pass
def main(self, param):
// use 'param' here.
while ...
これらの扱いはカスタムコンポーネントの実装者次第です。
例としてsin_and_random
では、パラメータに数値文字列をsin
カラムの波形の周波数として利用していますので参考にしてください。
5.1.2.4 カスタムコンポーネント用API#
本項で紹介した関数以外にも、多数のAPI関数が用意されています。 詳細は付録の「カスタムコンポーネントPython-APIリファレンス」を参照してください。
5.1.2.5 カスタムコンポーネントを使用する#
ここでは、作成したカスタムコンポーネントをSpeeDBee Synapseで使用する方法について説明します。
5.1.2.5.1 登録#
次の手順で、作成したカスタムコンポーネントをSpeeDBee Synapseに登録することができます。
-
設定メニューアイコンを押下し、「カスタム(Python)」を選択します。
-
カスタムコンポーネントの「追加」を押下し、カスタムコンポーネントのファイルを登録します。
-
「閉じる」を押下します。
-
確認ダイアログで「はい」を選択し、変更を反映するために、システムを再起動させます。
-
左メニューに、登録したカスタムコンポーネントが反映されます。
ファイルの保存先
登録したカスタムコンポーネントのファイルは、次のディレクトリに保存されます。
/var/speedbeesynapse/components_python_python1/speedbeesynapse/component/custom
保存先のディレクトリは、画面項目「保存先」に表示されます。
Pythonバージョン
カスタムコンポーネントを実行するPythonのバージョンは、画面項目「Python基本情報」の「Pythonバージョン」に表示されます。
Pythonファイル名
カスタムコンポーネントはSpeeDBee Synapseではモジュールとして扱っており、ファイル名をモジュール名としています。 したがって、ファイル名に"-"を使用した場合正しく扱えません。
Pythonの命名規則に従いファイル名は短くすべて小文字で区切りは"_"を使用してください。
5.1.2.5.2 設定画面#
カスタムコンポーネントの設定画面の項目は次の通りです。
項目 | 説明 |
---|---|
名称 | コンポーネントの名前を入力 ※他のコンポーネント名と重複する事はできません。 |
自動起動無効 | コンポーネントの自動起動を無効にする場合ON |
パラメータータイプ | パラメータの種類を次の中から選択 ・STRING:文字列 ・JSON:JSON文字列 |
パラメーター | コンポーネントパラメータを入力 |
スクリプトで使用するファイルをアップロードする(証明書など) | カスタムコンポーネントの処理で使用する、証明書などのファイルをアップロードする場合ON |
ファイル追加 | 押下すると、スクリプトで使用するファイルをアップロードすることができます。 ※新しく追加されたファイルは、コンポーネントの設定を保存した時にアップロードされます。 |
ファイルパス | アップロードしたファイルの保存先 ※パラメータでファイルパスを使用する場合、変数を用いることができます。詳細は、表題の「ファイルパス」の右のアイコンをクリックすると確認することができます。 |
設定画面のパラメータ入力部分に、ユーザーが定義した画面項目を表示するように変更することができます。 詳細は「カスタムUI作成」をご覧ください。
5.1.2.5.3 削除#
次の手順で、登録したカスタムコンポーネントをSpeeDBee Synapseから削除することができます。
-
設定メニューアイコンを押下し、「カスタム(Python)」を選択します。
-
カスタムコンポーネントの「削除」を押下します。
-
確認ダイアログで「はい」を選択します。
-
カスタムコンポーネントが削除されます。
5.1.2.5.4 表示#
次の手順で、登録したカスタムコンポーネントの内容を表示することができます。
-
設定メニューアイコンを押下し、「カスタム(Python)」を選択します。
-
カスタムコンポーネントの「表示」を押下します。
-
カスタムコンポーネントのソースコードが画面に表示されます。
5.1.2.5.5 ダウンロード#
次の手順で、登録したカスタムコンポーネントをダウンロードすることができます。
-
設定メニューアイコンを押下し、「カスタム(Python)」を選択します。
-
カスタムコンポーネントの「取得」を押下します。
-
カスタムコンポーネントのファイルがダウンロードされます。
5.1.2.5.6 外部ライブラリの追加#
次の手順で、カスタムコンポーネントが使用する外部ライブラリを、SpeeDBee Synapseに追加することができます。
-
設定メニューアイコンを押下し、「カスタム(Python)」を選択します。
-
Pythonモジュールの入力欄に、登録するライブラリの名前を入力します。
-
Pythonモジュールの「追加」を押下します。
-
確認ダイアログで「はい」を選択します。
-
ライブラリが登録されます。
登録した外部ライブラリは、次のディレクトリに保存されます。
/var/speedbeesynapse/dynlibs/pyvenv
保存先のディレクトリは、画面項目「保存先」に表示されます。
.whlファイルから追加する
Pythonモジュールの「whl」を押下し、.whlファイルを選択します。
pipバージョン
外部ライブラリの管理で使用するpipのバージョンは、画面項目「Python基本情報」の「pipバージョン」に表示されます。
5.1.2.5.7 外部ライブラリの削除#
次の手順で、追加した外部ライブラリを、SpeeDBee Synapseから削除することができます。
-
設定メニューアイコンを押下し、「カスタム(Python)」を選択します。
-
削除するライブラリをクリックします。
-
Pythonモジュールの「削除」を押下します。
-
確認ダイアログで「はい」を選択します。
-
ライブラリが削除されます。