ビッグデータを扱うための話

Pythonによるデータ加工での大容量csvの扱い」のような質問を見かけたので、回答を載せておく。普通には扱いないサイズの、ビッグデータを処理するための汎用的な話を残しておきます。

ビッグデータを扱うには、元となるデータを一度に処理可能な量(数十MB程度)に分割して処理を行うバッチ処理か、最小単位のデータ(1行)毎に処理を行うストリーム処理の何れかになります。

ビッグデータを処理するための環境として知られているApatch Hadoopは前者を、RDBに対するSQL文の処理などは後者の方法を使っています。

一行ずつのデータに対して処理を行う場合には、バッチ処理でもストリーム処理でも良いのですが、複数の行にまたがった計算処理を行う場合にはバッチ処理でないと難しくなります。例えば時系列にそった移動平均を計算しようとするなら、元となるデータを時系列順に並び替えなくてはなりません。ですが並び替える処理は多くのメモリを使用するため、データ量が多いと一度にはできません。こういう部分はストリーム処理では実装が難しくなります。

またビッグデータの処理では、どのような中間データを出力するかが肝になります。移動平均を算出する場合には「順不同の元データ→各月毎のデータに分割→各月毎のデータを並び替え→月毎のデータを結合→移動平均算出」のように、何段階かのバッチ処理に分ける必要があるでしょう。この時に汎用性を考えた中間データを作成しておかないと、異なる集計を行う場合に最初からやり直すために、多くの時間がかかる事になります。

Hadoopのプログラムを作るときのMap/Reduceの考え方は、Hadoop以外にも適用できるので、ビッグデータを処理する機会があるならば、Hadoopを使ってみることをお勧めします。既存の仮想マシンのイメージをダウンロードして使うなら、インストールに手間もかかりません。開発言語もJavaに限りません。コンソールからの入出力を処理できる開発言語なら何でも使えます。

Hiveのデータを圧縮して容量を節約する

LOAD DATAステートメントで取込むテキストファイルについては、Hive Language  Manualでもgzip形式で圧縮して取込むサンプルが例示されていますが、CREATE TABLE ~ STORED AS TEXTFILEで指定するテキストデータファイルに対してもgzipで圧縮して容量を節約することが出来ます。

特に気をつけるべき事はありません。テキストファイルを一つずつgzipで圧縮して、指定したフォルダに格納しておくだけです。

また圧縮することで処理速度が向上する場合があります。多くの処理系ではネットワーク帯域よりもメモリ帯域の方が圧倒的に高速なため、gzipで圧縮することによりネットワーク帯域負荷を下げる事によるメリットが、gzipの展開によるCPU負荷増大のデメリットよりも優るためです。

EMRを使う場合、巨大なデータをS3などのストレージに常時置いておくことによって発生する代金は馬鹿になりませんので、圧縮しておくことをお勧めします。

Windows系エンジニアのためのElastic Map Reduce 5

Amazon EMRでHiveを使う
EMRに接続して操作するためにSSHに対応したtelnetクライアントが必要です。私自身長年使っていることもあって、TeraTerm(http://ttssh2.sourceforge.jp/)をお勧めします。
Cluster Listから作成したEMRインスタンスの詳細を表示すると、Master public DNS nameにドメイン名が表示されています。このドメインにSSHで接続すると以下のように表示されます。SSHでログインするときのユーザーIDはhadoopです。

Linux (none) 3.2.30-49.59.amzn1.x86_64 #1 SMP Wed Oct 3 19:54:33 UTC 2012 x86_64
——————————————————————————–

Welcome to Amazon Elastic MapReduce running Hadoop and Debian/Squeeze.

Hadoop is installed in /home/hadoop. Log files are in /mnt/var/log/hadoop. Check
/mnt/var/log/hadoop/steps for diagnosing step failures.

The Hadoop UI can be accessed via the following commands:

JobTracker lynx http://localhost:9100/
NameNode lynx http://localhost:9101/

——————————————————————————–
hadoop@ip-10-121-9-185:~$
hadoop@ip-10-121-9-185:~$ hive

プロンプトが表示されたらhiveと入力して、hiveのインスタンスを起動します。
先ほどS3にアップロードしたファイルをテーブルとしてマウントします。

> Create external table reviewsNew
> (MemberID String, ProductID String, ReviewDate TimeStamp,
> HelpFeeedbackNum Int, FeedbackNum Int, Rating Int,
> Title String, Body String )
> row format delimited fields terminated by ‘t’ lines terminated by ‘n’
> stored as textfile location ‘s3://emr-sample/reviewsNew’

これで集計をおこなう準備が完了しました。

> select size(split(Body, ‘ ‘)) as wordscnt, count(*)
> from reviewsNew
> group by size(split(Body, ‘ ‘))
> sort by wordscnt;

上記のようなselect文を実行して集計してみます。bodyに含まれる単語数をカウントして、各単語数のレビューが何件あるかを集計します。この手のインデックスが全く機能しないようなSQLをRDBMSで実行した場合、現実的な時間内には終了しません。hiveなら5ノード程度の小さなクラスタ構成でも7分ほどで集計が終わります。
実際に使用する場合には以下のように結果保存用のテーブルを作成しておきます。

> create external table wordscnt
> (wordcnt Int, recordCnt Int)
> row format delimited fields terminated by ‘t’ lines terminated by ‘n’
> stored as textfile location ‘s3://emr-sample/wordscnt’

そして以下のように保存先テーブルを指定し、結果保存用のテーブルに格納します。その後、S3から作成されたデータファイルをダウンロードします。

> insert into table wordscnt
> select size(split(Body, ‘ ‘)) as wordscnt, count(*)
> from reviewsNew
> group by size(split(Body, ‘ ‘))
> sort by wordscnt;

Windows系エンジニアのためのElastic Map Reduce 3

AmazonでEMRを起動する
まずはEMRで使用するKeyPairを作成します。既に作ってある人は、それを流用してもかまいません。AWSマネジメントコンソールにログインし、EC2のコンソールを開いたら、左側のメニューからKey Pairsを選択します。Create Key Pairをクリックしてダイアログが表示されたら、Key Pair Nameを入力してYesをクリックします。自動的に鍵のダウンロードが始まるので、ダウンロードされたファイルを保管します。
続いてEMRのインスタンスを起動します。AWSマネジメントコンソールのElastic Map Reduceを選択し、Create clusterボタンをクリックして下さい。
Cluster ConfigurationのCluster Nameには適当に識別可能な名称を設定します。Loggingはチェックを外して下さい。
Software Configurationは初期設定のままでかまいません。Hadoop distributionはAmazonにチェックを、AMI Versionは初期設定で2.4.2になっているので、そのままにします。Applications to be installedにも初期設定でHiveとPigが含まれているはずです。
Hardware Configurationには若干のHadoopについての知識が必要です。NetworkとEC2 availability zoneは初期設定のまま、EC2 ClassicとNo preferenceにします。問題となるのはEC2 instance typeでHadoopを構成する仮想マシンノードの数と種類を指定します。
Masterは必ず1台必要になるノードで、Hadoopの分散処理に参加する全てのコンピュータを制御する中心となるノードです。構成する台数が増えた場合、Masterノードのスペックが低いと全体の処理性能が低下する恐れがあります。Masterノードは次に説明するCoreノードとしての機能も兼ねます。
Coreは必ず2台以上必要になるノードです。Hadoopの分散処理においてHDFSと言う分散ストレージを提供しています。EMRではストレージとして主にS3を使用するので、あまり容量を必要としませんが、Hive等のミドルウェアが一時的な記憶領域としてHDFSを使用する場合があります。途中でノードが停止した場合に備えるのと、読み取り負荷分散のため、三重に分散して保存するので、意外に容量を必要とする場合があり、必要に応じて増やします。
Taskは純粋に計算処理を担当するノードです。割り当てない事も出来ます。
Master、Core、TaskともにSpot Instanceを指定して料金を節約することが出来ますが、Masterノードが停止した場合には即座に処理が中断してしまいますし、Coreノードが2/3以上停止するとHDFSに保存したデータが消失してしまい、処理を継続出来なくなる可能性があります。練習中は良いですが、本番ではTaskノード以外をSpot Instanceにすることは避けた方が良いでしょう。
Security and AccessのEC2 key pairを設定して下さい。EMR起動後にSSHでログインするために公開鍵が必要になります。
最後にStepsのAuto TerminateはNoに設定します。

Windows系エンジニアのためのElastic Map Reduce 2

Hive用データを準備する
HiveはCSV形式やTSV形式のファイルに対して処理を行ないます。処理対象とするファイルを事前に変換しておきます。
ファイルは数10MBから数百MB程度のサイズで、複数のファイルに分割しておきます。Hiveは一つのファイルを一つのMapperプロセスに割り当てます。検索対象となるファイルの数が処理を分散できるノード数の上限になるので、必ず適度なサイズに分割しておかないと、Hadoopのメリットを得られません。
Hiveでは一つのフォルダを一つのテーブルとして扱います。ここで作成した分割ファイルをひとつフォルダに保存します。
通常のHadoopではHDSFを使用しますが、Amazon EMRではS3をストレージとして使用します。EMR用にS3のBucketを作成し、フォルダを作り、そこに作成したファイルをアップロードします。ここで作成したフォルダがHive上では一つのテーブルとして扱います。
Bucketを作成するリージョンは、EMRで使用するリージョンと同じ場所にします。同一リージョン内でのネットワーク通信は課金の対象となりませんが、リージョンをまたいだ通信は課金の対象となるので注意しましょう。
同様の理由でCSV形式のファイルへの変換や、CSVファイルをS3にアップロードする作業もAmazon EC2上で行なった方が良い場合もあります。EC2上の仮想マシンからS3へのアップロードは低速のインスタンスでも200~400Mbpsで行えますが、インターネット経由ではシングルスレッド転送だと数百Kbps~数Mbps程度になってしまい、大量のデータをアップロードするには時間がかかりすぎます。
都合、インターネット経由でアップロードする場合には、S3 Browser(http://s3browser.com)のような、マルチスレッド対応した転送ツールが便利です。
ここではサンプルデータとしてインターネット上で公開されているAmazonのレビュー記事のデータを使います。http://liu.cs.uic.edu/download/data/からreviewsNew.rarをダウンロードしてきます。解凍して得られたファイルを適当な行数で分割します。私はCygwinのsplitコマンドを使用しました。これを新たにemr-sampleバケットを作成し、reviewsNewフォルダにアップロードします。

Windows系エンジニアのためのElastic Map Reduce 1

初めに・・・
MapReduceが世に出て早10年、Amazon Elastic Map Reduce(以降EMR)のようにパブリッククラウド上でHadoopを使った分散処理を行えるようになって5年ほどが過ぎました。実際活用しようとするとLinuxを中心とするOpenSource界隈の知識が必要になるため、Windows系のエンジニアからはどうしても遠い存在になりがちです。
ところが最近EMRのWEBインターフェースが刷新されました。以前はコマンドラインでの操作が必須だったのが、WEBからの操作でもずいぶんと使いやすくなりました。この機会にEMRを使いこなしてBigDataの処理など学びたいと思います。
さてBigData処理には大別して二つあります。一つは大量の非構造化データを扱う処理。二つ目は大量の構造化データを扱う処理です。非構造化データの処理は自然言語処理、機械学習といった高度なソフトウェアサイエンスの知識が必要不可欠で、Hadoopで分散処理ができるようになったから一朝一夕に実現できる類のものではありません。二つ目の構造化データなら高度なコンピュータサイエンスの知識がなくとも、手元のデータを解析して何らかの答えを得ることは可能です。だから、ここでは構造化データを扱うことを最初の目標としたいと思います。
構造化データならRDBを使えばHadoopなんか要らないのでは?と考えるかもしれませんが、RAWレベルのログデータなんかを1年分集めると、中小企業でも数十GB~数百GBに達してしまいローレベルのサーバーでは到底解析できないサイズになってしまいます。これを解析できるサーバーを別途用意するというのは、中小企業にとっては大きなハードルなのです。
Hadoopの基本はMap Reduceにあるわけだけど、実際に使うためにはプログラムを開発せざる得ない。そしてプログラムを開発するとなると、テストやら何やらで結構時間がかかるもの。特に私みたいなWindows中心の開発者だと、Linux環境でのプログラミングに不慣れなこともあって、やたらと時間がかかったりします。でも構造化データを扱うならばMap Reduceは必要ありません。HadoopにはHiveやPigといった構造化データを手軽に扱うための仕組みが用意されています。
HiveはHadoop上で動作する簡易RDBです。一般的なRDBと異なりSELECT文は充実していますが、INSERTやUPDATEと言ったデータを更新するための機能が著しく制限されています。カンマ区切りやタブ区切り形式で保存されたテキストファイルや、Hive独自形式のデータベースファイルに対して集計処理を行えます。Hiveに与えたSQL文は数段階の処理に分解されMap Reduceアプリケーションにより集計されます。下手なMap Reduceアプリケーションを作るより余程高速に動作するので、まずはHiveを使った集計処理を学びたいと思います。