AWS IoT と SkyOnDemandでIoTデータ連携

はじめに

早いもので2017年も一ヶ月が過ぎようとしています。
寒い日が続きますね。
そんなときは家でRasberry Piで遊んじゃいましょう。

データフロー

今回試したRasberry Pi からのデータ連携フローは下記のようなイメージです。

  1. Rasberry Pi からMQTTでAWS IoTへデータをPublish
  2. AWS IoTでは受け取ったデータをS3へCSVで出力
  3. S3へ出力したことをLambdaからSkyOnDemandへHTTPリクエストを送信して通知
  4. SkyOnDemadのHTTPトリガでHTTPリクエストを受け取り、S3からCSVファイルをダウンロード
  5. ダウンロードしたCSVを展開しSalesforceへ書き込む

awsiot_01.png

わざわざS3にCSVで保存しなくても、直接jsonでSkyOnDemadへ送れば。。。
という声が聞こえそうですが、ここはみんな大好きCSVを経由しました。

AWS IoT の設定

3ヶ月ぶり位にマネジメントコンソールからAWS IoTを触ったのですが、UIが全く別物になっていて、かなり右往左往しました。。。
AWS IoTの設定は基本的に下記の手順で行います。

  1. Thingの作成
  2. Pollicyの作成
  3. 証明書の作成
  4. 証明書をThingとPolicyに紐付ける
  5. Ruleの作成
  6. IAMロールの作成

マネジメントコンソール → AWS IoT → Connect → Configuring a device を使うと、上記1~4を順次実行できて便利です。

RuleでRasberry Pi から受け取ったMQTTデータを処理するフローを定義します。
ここでは、topic_2というトピック名で受け取り、S3へのPutとLambda関数の実行を定義しています。

awsiot_03.png

Lambda Functionは下記のようにSkyOnDemandへHTTPリクエストを送信する簡単なものです。
なお、CID, SID, Authorization の各文字列はSkyOnDemandのHTTPトリガ作成時に作られるものです。
TECH BLOGで戸塚が詳しく書いてます。

var http = require ('https');
exports.handler = function(event, context) {

    var options = {
		hostname: 'www.skyondemand.net',
		port: 443,
		path: '/ws/trigger/awsLambda?cid=XXXXX&sid=XXXXX',
		method: 'GET',
		headers: {
                    'Authorization': 'Basic ' + 'XXXXXXXXXXXXXXXXXXXX'
                }
    };

    var req = http.request(options, function(res) {
         res.setEncoding('utf8');
    });

    req.on('error', function(e) {
        context.done('error', e);
    });

    req.write('data\n');
req.end(); };

Rasberry Pi からAWS IoTへPublishする環境の構築

Rasberry Pi側はAWSの「 IoT Device SDK for JavaScript 」を使います。
上記を使うためにはNode.jsをインストールする必要がありますが、かなり手順が多いです。
途中でエラー祭りが始まり四苦八苦した結果たどり着いた手順が下記です。

  1. nvmのインストール
    $ git clone https://github.com/creationix/nvm.git ~/.nvm
    $ source ~/.nvm/nvm.sh
  2. node.jsのインストール
    $ nvm install v6.9.4
  3. .bashrcへパスを通す
    vi /home/pi/.bashrc
    if [[ -f ~/.nvm/nvm.sh ]]; then
    source ~/.nvm/nvm.sh
    nvm use 6.9.4
    fi
  4. npmのインストール
    $sudo apt-get install npm
  5. expressのインストール
    npm install express --save
  6. aws-iot-device-sdk-js のインストール
    ようやくここまで来ました!!

    $ git clone https://github.com/aws/aws-iot-device-sdk-js.git
    $ cd aws-iot-device-sdk-js
    $ npm install mqtt
    $ npm install blessed
    $ npm install blessed-contrib
    $ npm install minimist
    $ npm install crypto-js
    $ npm install websocket-stream
    $ npm install date-utils

  7. マネジメントコンソールで作成した証明書は下記に置いてあげます。
    /home/pi/aws-iot-device-sdk-js/certs

さて、いよいよ実行です。
/home/pi/aws-iot-device-sdk-js/examplesにサンプルスクリプトが用意されてるので実行してみます。
ドキドキ!

  $ node examples/device-example.js -f ../certs -g ap-northeast-1

awsiot_04.png

お!connectって出来ますね。
S3にファイルは出来てるでしょうか?

awsiot_05.png

できてますね!
中身は?

awsiot_06.png
json形式で吐き出されてます。
CSV形式にしたいので、無理やり感はありますが、日時を付けてカンマで区切ってあげます。
修正したexample.js(一部)は下記です。

	var dt = new Date();
	var formatted = dt.toFormat("YYYY-MM-DD HH24:MI:SS");

      if (args.testMode === 1) {
//         device.publish('topic_2', JSON.stringify({
//            mode1Process: count
//         }));
         device.publish('topic_2', '"'+formatted+'",'+'"'+count+'"');
      } else {
         device.publish('topic_1', JSON.stringify({
            mode2Process: count
         }));
      }

S3に出力するファイルの拡張子も.csvに変更します。
変更は下記にて行います。
マネジメントコンソール → AWS IoT → Rules → Rule名 → Stone massage in an Amazon S3 buket

SkyOnDemandのHTTPトリガスクリプトの作成

AWS Lambdaから送信されたスクリプトはSkyOnDemandで受信し、下記手順を記述したスクリプトを起動します。

  1. S3バケットからCSVを取得する
  2. CSVファイルを読み込む
  3. SalesforceのカスタムオブジェクトへInsertする

イメージはこのようになります。

awsiot_07.png

実際にRasberry Pi から送信してみます。
無事にSalesforceまでデータが到達しました!

awsiot_08.png

今回は最終的にSalesforceへデータを保存しましたが、SkyOnDemandならば様々なアダプタが用意されているので、他のクラウドやオンプレのDBなど保存先はいろいろ選択できます。

おわりに

今回の試行はRasberry Pi からデータが送られる度にSalesforceへデータをInsertしているので、SalesforceのAPIコール数をかなり消費します。
本来であれば、AWSのRDSやDynamoDBにデータを溜めて、集計、解析したものをSalesforceへ連携することを考えた方がよさそうです。
2017年1月22日にサービスインした「 DataSpider Cloud 」では、AWS DynamoDBアダプタが標準装備されたり、トリガ数の制限が無くなったりさらにIoT向けのサービスになっています。
同様に2017年3月に予定されているSkyOnDemandのアップデートでもAWS DynamoDBアダプタやAWS Kinesisトリガが追加され、益々IoTデータ連携機能が強化されます。

2016年はIoTがバズワードになりましたが、最近はAIという言葉のほうがよく聞きます。
AIはビッグデータを活用するアーキテクチャであり、IoTはビッグデータを集めるための「手段」です。
今回の試行のように、デバイスからクラウド、クラウドからクラウドというデータ連携ベースを一つ手の内に入れておくと今後応用が効きます。