Skip to main content

セキュリティイベントログの分析

3 タスク

20 分

Visible to: All users
上級
Pega Platform 8.3.1
Pega Platform 8.5
Pega Platform 8.6
日本語

シナリオ

Front Stageでは、誰かのユーザー名とパスワードが、おそらくフィッシング攻撃によって漏洩した可能性を危惧しています。 FSGは、Bookingアプリケーションにアクセスするすべての人がジオロケーションを有効にしなければならないという制限を課すことにしました。 ジオロケーションが有効でない場合、Bookingアプリケーションへのアクセスは許可されません。

FSGは、カスタムセキュリティイベントをログする際に、各ユーザーのpxRequestorのpxLatitudeとpxLongitudeを書き出したいと考えています。 また、FSGは各イベントのタイムスタンプを記録することで、マイル/時(mph)やkm/時(kph)といった速度の計算を簡単に行えるようにしたい考えています。 同一のユーザーが2件のセキュリティイベントを記録する際の移動速度が妥当な範囲を超えた場合、FSGはそのユーザーが誰であり、その2件のイベントがどこで発生したのかを知りたいと考えています。

ジオロケーションが有効になっていない場合、Bookingアプリケーションへのアクセスが拒否されるようにする必要はありません。 代わりに、同じユーザーについてPegaRULES-SecurityEvent.logファイルに記録された2件のイベント間の速度を効率的に計算する方法を考案します。 速度が妥当な値を超えた場合、その2件のイベントを出力します。

このチャレンジをクリアするには、Linux Lite VMを使用する必要があります。

このチャレンジを完了するには、Pegaインスタンスを起動する必要があります。

起動には5分ほどかかることがありますので、しばらくお待ちください。

詳細なタスク

1 ソリューション詳細の確認

この演習のソリューションを導き出すには、Apache SparkとRumbleをインストールする必要があります。

  • インストラクションを読み、Rumble:Installing Sparkをインストールします。
  • Apache Sparkをダウンロードします。
  • Rumblespark-rumble-1.0.jarなど)をダウンロードします。

2 Apache SparkとRumbleのインストールと構成

  1. VM内で動作するブラウザーを使用して、Apache Sparkをダウンロードします。 インストラクションに従い、次のコマンドを実行して、ダウンロードしたファイルを/usr/local/binに移動させます。

    tar zxvf spark-2.4.4-bin-hadoop2.7.tgz
    sudo mv spark-2.4.4-bin-hadoop2.7 /usr/local/bin

  2. 次のコマンドを実行して、Apache Sparkを環境変数PATHに追加します。

    export SPARK_HOME=/usr/local/bin/spark-2.4.4-bin-hadoop2.7
    export PATH=$SPARK_HOME/bin:$PATH

  3. 次のコマンドを実行して、ステップを確認します。

    cd $SPARK_HOME/bin
    spark-submit –version
    cd $HOME
    spark-submit –version

  4. VM内で動作するブラウザーを使用して、Rumbleをダウンロードします。
  5. Rumbleが実行されるディレクトリを作成します(例:$HOME/pega8/rumble)。
  6. ダウンロードした spark-rumble-1.0.jarを、作成したディレクトリに移動します。

Rule-Utility-Libraryに「Security」という名前のRule-Utility-Functionsを定義する

Security • LogCustomEventValueGroup() Void

  • パラメーター:eventType String、outcome String、message String、customFlds ClipboardProperty
  • 説明:customFldsはText ValueGroupでなければなりません。 カスタムのセキュリティイベントを記録したい場合に、このFunctionを呼び出します。

Security • NumericTimestamp() String

  • パラメーター:unit String、許可される値 = "h"、"m"、"s"
  • 説明:現在のタイムスタンプを時、分、秒の単位で文字列として返します。

FSG-Booking-Work OpenDefaults拡張ポイントを上書きする

OpenDefaults

ジオロケーションとカスタムセキュリティイベントのログを有効にする

Chromeブラウザーは、サーバーがセキュアでない限り、ジオロケーションリクエストをサポートしません。 Chromeは、Linux VMのIPアドレスがlocalhostという名前にマッピングされている場合に、そのVM内でジオロケーションリクエストを許可します。

例:

pega8@Lubuntu1:~$ more /etc/hosts
192.168.118.145 localhost
127.0.0.1 localhost
127.0.1.1 Lubuntu1

  1. localhostの最初のエントリーには、ifconfig IPアドレスを入力します。
  2. ping localhostを実行してテストします。

    WhenルールのpyGeolocationTrackingIsEnabledは、trueを返さなければなりません。

    Data-Portalに適用されるWhenルールでは、ログイン後に次のようなポップアップウィンドウが表示されます。
Location access
  1. Click Allow Location Access
  2. クリップボード上のpxRequestorページ内に設定されているpxLatitiudeとpxLongitudeを確認します。
  3. ヒント: 最良の結果を得るために、ブラウザーはFirefoxが推奨されます。
  4. Security > Tools > Security > Security Event Configuration」をクリックして、「Security Event Configuration」ランディングページを開きます。
  5. ランディングページの下部にある「ON 」をクリックして、カスタムイベントを有効にします。
  6. 「Submit」をクリックします。
    Custom event
  7. レビューのために「Event」ケースを開き、クリップボードのpyWorkPageまたはTrace OpenDefaultsを確認すると、次の画像のようにcustomFldsのTextバリューグループの値が表示されます。
    Custom fields
  8. System > Operations > Logsをクリックします。
  9. SECURITYEVENTの右側にある「Text 」リンクをクリックした後、「Log Files」をクリックします。
  10. 要求された場合は、ユーザー名「admin」とパスワード「admin」を入力します。
  11. SecurityEvent.logファイルを開き、customFldsの値が記録されているかどうかを確認します。

数値を囲む引用符を削除する(JSONフォーマットで必要)

以下の手順で、数値を囲む引用符を削除します(ソースはこちらをクリック)。

  1. 正規表現で調べたいテキストをコピー&ペーストします。
  2. 負の値になってもよい場合は、各値を(-?[0-9]*\.[0-9]*)に、負の値になってはならない場合は各値を([0-9]*\.[0-9]*)に置き換えます。
  3. 解析された値の行き先を指定するには、\1、\2などを使用します。
補足: find クローズとreplace クローズの間にスラッシュを入れる必要はありません。 最初の「s」の後に表示される文字が、ステートメントの区切り文字になります。 ステートメントの最後にはその区切り文字を使用し、findとreplaceを分ける際には、ステートメントの途中で使用する必要があります。
BEFORE text.txt
{"id":"25538d8d-2861-40a9-b6df-d289e4b73a7e","eventCategory":"Custom event","eventType":"FooBla","appName":"Booking","tenantID":"shared","ipAddress":"127.0.0.1","timeStamp":"Mon 2019 Aug 05, 19:31:42:060","operatorID":"Admin@Booking","nodeID":"ff9ef7835fd4906aea82694c981938d0","outcome":"Fail","message":"FooBla failed","requestorIdentity":"20190805T192510","lon":"-98.0315889","lat":"30.123275"}
cat text.txt | sed -r 's/"lon":"(-?[0-9]*\.[0-9]*)","lat":"(-?[0-9]*\.[0-9]*)"}/"lon":\1,"lat":\2}/'
AFTER text.txt
{"id":"25538d8d-2861-40a9-b6df-d289e4b73a7e","eventCategory":"Custom event","eventType":"FooBla","appName":"Booking","tenantID":"shared","ipAddress":"127.0.0.1","timeStamp":"Mon 2019 Aug 05, 19:31:42:060","operatorID":"Admin@Booking","nodeID":"ff9ef7835fd4906aea82694c981938d0","outcome":"Fail","message":"FooBla failed","requestorIdentity":"20190805T192510","lon":-98.0315889,"lat":30.123275}

改行を1行ずつではなく、ファイル全体で一度に置換する

Unixスクリプトで読み取られるファイルに改行があると、各行が個別に処理されます。 ファイル全体を1つの長い文字列として扱い、その文字列の中で改行を置換する必要があります(ソースはこちらをクリック)。

改行コードを削除するsedコマンドの出力は再びsedにパイプされますが、今回は、「}<space>{」が「}<comma>{」に置換されます。

sed -e ':a' -e 'N' -e '$!ba' -e 's/\n/ /g' PegaRULES-SecurityEvent.log | sed 's/} {/},{/g'

1つ目のファイル「test2.jq」は、JSON配列の左括弧を含むクエリの開始点として機能します。let $log := [.

2つ目のファイル「restof_test1.jq」には、JSON配列の右括弧から始まるJSONiqクエリの末尾を記述します。

  1. スクリプトにより、新しいクエリファイルが「> test2.jq」を使って初期化されます。
  2. 続いて、「>> test2.jq」を使って、連続したsedコマンドが出力されます。
  3. 次に「>> test2.jq」を使ってrestof_test1.jqが付加されます。
JSONLogToArray.sh
#!/usr/bin/env bash
set -x

echo "let \$log := [" > test2.jq

cat PegaRULES-SecurityEvent.log
| sed -r 's/"lon":"(-?[0-9]*\.[0-9]*)"/"lon":\1/' \
| sed -r 's/"lat":"(-?[0-9]*\.[0-9]*)"/"lat":\1/' \
| sed -r 's/"ts":"([0-9]*\.[0-9]*)"/"ts":\1/' \
| sed -e ':a' -e 'N' -e '$!ba' -e 's/\n/ /g' \ | sed 's/} {/},{/g' >> test2.jq

cat restof_test1.jq >> test2.jq

次に、JSONiqクエリの残りの部分(restof_test1.jq)が処理されます。 2つの座標間の距離がマイルで計算され(3958.8は地球の半径(マイル))、それが2件のログファイルエントリーの時間差で除算されます。 タイムスタンプは、1-1-1970 GMTからの時間として計算されます。

フィルター条件により、行のペアは以下の場合にのみ検査されます。

  1. eventTypeがOpen Work(上記のFSG-Booking-Work OpenDefaultsの上書きを参照)である。
  2. 2件のレコード内のoperatorIDの値が同一である。
  3. 両方の行の緯度が20より大きい(これは、両方の行にジオロケーション座標が含まれていることを確認するために使用されます。 任意の比較を使用できます)。
  4. 2行目のタイムスタンプは、1行目のタイムスタンプより後である(これにより、速度がマイナスになってしまうような冗長な計算を避けることができます)。
restof_test1.jq
let $pi := 3.1415926
let $join :=
for $i in $log[], $j in $log[]
where $i.eventType = "Open Work"
and $i."id" != $j."id"
and $i."operatorID" = $j."operatorID"
and $i.lat>20 and $j.lat>20
and $j.ts>$i.ts
let $lat1 := $i.lat
let $lon1 := $i.lon
let $lat2 := $j.lat
let $lon2 := $j.lon
let $dlat := ($lat2 - $lat1) * $pi div 180
let $dlon := ($lon2 - $lon1) * $pi div 180
let $rlat1 := $lat1 * $pi div 180
let $rlat2 := $lat2 * $pi div 180
let $a := sin($dlat div 2) * sin($dlat div 2) + sin($dlon div 2) * sin($dlon div 2) * cos($rlat1) * cos($rlat2)
let $c := 2 * atan2(sqrt($a), sqrt(1-$a))
let $distance := $c * 3958.8
let $tdiff := $j.ts - $i.ts
let $mph := $distance div $tdiff
return { "id" : $j.id, "eventType" : $j.eventType, "distance":$distance, "mph":$mph}
return [$join]

シェルモードでコマンドを実行するのではなく、JSONiqクエリをApache Sparkに送信する

クエリファイルのコンテンツをApache SparkとRumbleに送信する際に、ファイルを開き、すべて選択し、コピーし、Rumbleのコマンドラインに貼り付けるよりも簡単な方法を使用できれば理想的です(ソースはこちらをクリック)。

  1. --shell yes」ではなく
  2. --query-path file.jq --output-path results.out」を使用します。
  3. Apache SparkがインストールされているシステムのRumble .jarが存在するディレクトリで、次のコマンドを実行します。
pega8@Lubuntu1:~/rumble$ spark-submit --master local[*] --deploy-mode client spark-rumble-1.0.jar --query-path "test2.jq" --output-path "test2.out"
Example test2.out
[ { "id" : "f5b07887-11ef-4f6f-9f0f-efb060bd3cd7", "eventType" : "Open Work", "distance" : 31.6128421996284034764, "mph" : 2634.4035166357 }

「example test2.out」は次のように解釈できます。

2件のセキュリティイベントが短時間に連続して記録されました。 この2件のセキュリティイベントが記録された時間内に31.6マイル(50.9km)の距離を移動するには、時速2634マイル(4329km)で移動しなければなりませんでした。

3 ソリューションダウンロードのレビュー



このモジュールは、下記のミッションにも含まれています。

トレーニングを実施中に問題が発生した場合は、Pega Academy Support FAQsをご確認ください。

このコンテンツは役に立ちましたか?

改善できるところはありますか?

We'd prefer it if you saw us at our best.

Pega Academy has detected you are using a browser which may prevent you from experiencing the site as intended. To improve your experience, please update your browser.

Close Deprecation Notice