ラベル StreamScale の投稿を表示しています。 すべての投稿を表示
ラベル StreamScale の投稿を表示しています。 すべての投稿を表示

2011年4月20日水曜日

演習課題 - Elastic StreamScale

石井、鈴村の「Elastic Stream Computing with Clouds」のアイデアを StreamScale 上でシームレスに実装し、性能評価を行う。

竹野君 ToDo
(1) 石井君の work 理解 --> "SACSIS 2011 論文とスライド読み”, 実験の再現
(2) Yahoo S4 理解(コードと論文)
(3) StreamScale のコード理解 (5/11 18:00 - )
(4) StreamScale の再設計と実装 (竹野、上野、鈴村で議論)
(5) 上記 Elastic Stream Computing のアイデアを StreamScale 上に実装

以上です

2011年2月9日水曜日

研究テーマ候補

研究テーマ候補のリマインド

1. メモリ使用量を考慮した実行時スケジューリング及びマイグレーション
http://suzumura-lab.blogspot.com/2010/07/blog-post_28.html

2. 階層的ストレージ構成を透過的に活用するデータストリーム処理系(例:大規模グラフ)
http://suzumura-lab.blogspot.com/2010/12/streamgraph_4464.htmlにおける以下の問題
メモリに載りきらないようなグラフストリームが到着した際に Persistent な分散ファイルシステムと協調し階層的にグラフ構造を格納、頻繁にアクセスされる部分グラフのみをオンメモリ上に確保するといった最適化手法、そしてそれらのグラフデータに対して開発者からシームレスにアクセスできるような処理基盤

3. GIM-V モデルのGPU実装
大規模グラフの処理モデルGIM-Vの処理系を GPU (+分散用に StreamScaleまたは System S) を用い、実装.  関連研究→Mars

4. Incremental GIM-V 処理系の発展
- 耐故障性: ノード故障におけるマイグレーション・スケジューリング問題
- GPUを用いた高速化:完全に再計算すべきサブグラフのGPUによる処理

5. 超並列異機種混在計算機環境におけるX10を用いたグラフ処理の最適化
並列プログラミング言語 X10を用いて大規模グラフ処理を記述し、CPUとGPUから構成される超並列異機種混在環境(例:TSUBAME2.0) 上における性能最適化を実現する。また、そこに介在する様々な技術的課題(プログラマビリティ、性能、自動生成されるGPUコードなど)を特定し、X10の実行時処理系にその課題を解決する機能を実装し、性能向上を実現する. グラフ処理に関しては、GIM-VのモデルをX10で記述するのも良いでしょう。

2010年11月5日金曜日

MapReduce Online

http://www.eecs.berkeley.edu/Pubs/TechRpts/2009/EECS-2009-136.html


MapReduce is a popular framework for data-intensive distributed computing of batch jobs. To simplify fault tolerance, the output of each MapReduce task and job is materialized to disk before it is consumed. In this paper, we propose a modified MapReduce architecture that allows data to be pipelined between operators. This extends the MapReduce programming model beyond batch processing, and can reduce completion times and improve system utilization for batch jobs as well. We present a modified version of the Hadoop MapReduce framework that supports online aggregation, which allows users to see "early returns" from a job as it is being computed. Our Hadoop Online Prototype (HOP) also supports continuous queries, which enable MapReduce programs to be written for applications such as event monitoring and stream processing. HOP retains the fault tolerance properties of Hadoop, and can run unmodified user-defined MapReduce programs.

[StreamScale] ZooKeeper: Wait-free coordination for Internet-scale systems

http://research.yahoo.com/pub/3280

ZooKeeper: Wait-free coordination for Internet-scale systems

Authors:Hunt, P.; Konar, M.; Junqueira, F.P.; Reed, B.
Source: USENIX Annual Technology Conference (2010)

Abstract: In this paper, we describe ZooKeeper, a service for coordinating processes of distributed applications. Since ZooKeeper is part of critical infrastructure, ZooKeeper aims to provide a simple and high performance kernel for building more complex coordination primitives at the client. It incorporates elements from group messaging, shared registers, and distributed lock services in a replicated, centralized service. The interface exposed by Zoo- Keeper has the wait-free aspects of shared registers with an event-driven mechanism similar to cache invalidations of distributed file systems to provide a simple, yet powerful coordination service. The ZooKeeper interface enables a high-performance service implementation. In addition to the wait-free property, ZooKeeper provides a per client guarantee of FIFO execution of requests and linearizability for all requests that change the ZooKeeper state. These design decisions enable the implementation of a high performance processing pipeline with read requests being satisfied by local servers. We show for the target workloads, 2:1 to 100:1 read to write ratio, that ZooKeeper can handle tens to hundreds of thousands of transactions per second. This performance allows ZooKeeper to be used extensively by client applications.

[StreamScale] S4: Distributed Stream Computing Platform

http://s4.io/
http://labs.yahoo.com/node/476

S4: Distributed Stream Computing Platform
Authors: Neumeyer L, Robbins B, Nair A, Kesari A

Abstract: S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data. Keyed data events are routed with affinity to Processing Elements (PEs), which consume the events and do one or both of the following: (1) emit one or more events which may be consumed by other PEs, (2) publish results. The architecture resembles the Actors model [1], providing semantics of encapsulation and location transparency, thus allowing applications to be massively concurrent while exposing a simple programming interface to application developers. In this paper, we outline the S4 architecture in detail, describe various applications, including real-life deployments. Our de- sign is primarily driven by large scale applications for data mining and machine learning in a production environment. We show that the S4 design is surprisingly flexible and lends itself to run in large clusters built with commodity hardware.

- PE がアクター(エージェント)となり、各イベントを処理していくモデル
- PE の開発者は、2つのメソッド processEvent と output メソッドを実装する。processEvent は受けたイベントを処理して、出力ストリームに渡す。output メソッドは、外部のコンポーネントから呼び出され、Aggregate のように定期的にバッファリングされたデータに対して処理するようなことができる
- 複数の PE は疎な結合となっており、どの PE とどの PE がつながっているかは、Spring フレームワークの Beans の記述として記述する。PE 同士の接続をアプリケーション側では書かない。
- Hadoop 上で実装されている、というネット上のコメントがあるが、それは全くの誤り。Hadoop 上では実装されていない。
- すべての PE は 対称的で、マスターデーモンのような存在はない。

- S4 のサンプルプログラムでは、Twitter Stream API を用いてトピックの数をカウントするアプリケーションを実装

2010年10月22日金曜日

[StreamGraph] グラフデータベース

グラフデータベース
http://www.infoq.com/jp/articles/graph-nosql-neo4j

[StreamScale] ビルド、コード管理など

- ビルド ant
- コード管理 sourceforge + SVN
- パフォーマンス測定ツール: JMeter
- ログ: Log4J

[StreamScale] JVM リモート監視

JMX を使いましょう

http://nadeausoftware.com/articles/2008/03/java_tip_how_get_cpu_and_user_time_benchmarking#UsingaSuninternalclasstogetJVMCPUtime
など

[StreamScale] 通信レイヤ

Apache MINA

MINA is a simple yet full-featured network application framework which provides:

Unified API for various transport types:
TCP/IP & UDP/IP via Java NIO
Serial communication (RS232) via RXTX
In-VM pipe communication
You can implement your own!
Filter interface as an extension point; similar to Servlet filters
Low-level and high-level API:
Low-level: uses ByteBuffers
High-level: uses user-defined message objects and codecs
Highly customizable thread model:
Single thread
One thread pool
More than one thread pools (i.e. SEDA)
Out-of-the-box SSL · TLS · StartTLS support using Java 5 SSLEngine
Overload shielding & traffic throttling
Unit testability using mock objects
JMX managability
Stream-based I/O support via StreamIoHandler
Integration with well known containers such as PicoContainer and Spring
Smooth migration from Netty, an ancestor of Apache MINA.

[StreamScale] DB

StreamScale のランタイムですが、Apache プロジェクトで利用できるコンポーネントは積極的に使っていきましょう。

- Apache Derby : RDB (Pure Java)
- Apache MINA : 効率的な通信レイヤの実装用
- APR (Apache Portable Runtime Project)
- Cassandra (http://cassandra.apache.org/)

2010年10月19日火曜日

[StreamScale] Java と Infiniband

StreamScale のノード間通信に重要


Efficient Java Communication Libraries over InfiniBand


Guillermo L. Taboada

http://www.computer.org/portal/web/csdl/doi/10.1109/HPCC.2009.87

This paper presents our current research efforts on efficient Java communication libraries over InfiniBand. The use of Java for network communications still delivers insufficient performance and does not exploit the performance and other special capabilities (RDMA and QoS) of high-speed networks, especially for this interconnect. In order to increase its Java communication performance, InfiniBand has been supported in our high performance sockets implementation, Java Fast Sockets (JFS), and it has been greatly improved the efficiency of Java Direct InfiniBand (Jdib), our low-level communication layer, enabling zero-copy RDMA capability in Java. According to our experimental results, Java communication performance has been improved significantly, reducing start-up latencies from 34μs down to 12 and 7μs for JFS and Jdib, respectively, whereas peak bandwidth has been increased from 0.78 Gbps sending serialized data up to 6.7 and 11.2 Gbps for JFS and Jdib, respectively. Finally, it has been analyzed the impact of these communication improvements on parallel Java applications, obtaining significant speedup increases of up to one order of magnitude on 128 cores.

[StreamScale] 機能

先日の機能要件で抜けている機能要件の一部。全部は最初は実装しないので、要件を一通り洗い上げて、最初のバージョンでどれを実装すれば良いかを選択するようにしましょう。

エンジン (SSR)
- データレート、CPU 使用率の計測

可視化ツール
- ストリームモニタリングツール(ちゃんとタプルが流れているかどうかを可視化する)
- どのオペレータがどのノードに今アサインされているかを可視化
(最初はテキストベースでも良いでしょうが、ないと不便でしょう)

[StreamScale] SSR (StreamScale Runtime) のリモート起動

自前でシェルスクリプトなどを用意するのでも良いが、GXP を使えばよいだろう。ただ、ちゃんとポータブルな実装になっているかどうかを確認する必要があるが。


リモートの JVM の監視ツールとしては以下のようなツールも存在する。
jps - Java Virtual Machine Process Status Tool
http://download.oracle.com/javase/6/docs/technotes/tools/share/jps.html

[StreamScale] 実装時に参考になるであろう情報

Efficient data transfer through zero copy
http://www.ibm.com/developerworks/library/j-zerocopy/

[StreamScale] 設計

概要設計(10月中いっぱいまでを予定):2週間程度

第1回目: Programming Model / API (2010/10/13)
第2回目: Runtime Architecture (2010/10/18)
第3回目: 第1回目、第2回目のまとめ、他に必要な機能の議論

詳細設計:2週間程度 (11月1週目、2週目)
- API および SPI 決定

実装(第1ラウンド:11月中旬~12月下旬)
- 分担
- テストケース: Test First の思想に基づく

Agile な開発方式をするので、
まず第一プロトタイプを年内に完成させるのが目標

2010年8月16日月曜日

[StreamScale] 実装時の参考文献、参考技術

[スケーラビリティ]
- Doug Lea による Java NIO を用いたサーバーサイド実装のスライド "Scalable IO in Java"
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

- JDK 6 の NIO パッケージの新機能 http://download.oracle.com/javase/6/docs/technotes/guides/io/enhancements.html#6
java.nio.channels.SelectorProvider クラスの実装が、Linux Kernel 2.6 から導入された epoll システムコールを使用するように書き換えられている

- Distributed computing using Java: A comparison of two server designs

- NPTL (Native Posix Threading Library) : POSIX スレッドのアプリケーションをOS レベルで効率的に動作させる機能。Linux Kernel 2.6 以降から導入されたが、Java の Non-Blocking IO などの必要不可欠性が薄まりつつある可能性もある

[コンポーネント化]
- Apache Cocoon: Spring ベースのソフトウェアコンポーネント化ライブラリ。Matt Welsh の SEDA (Staged Event Driven Architecture) を踏襲. OSGi とも類似している

2010年7月28日水曜日

データストリーム処理におけるメモリ消費量を考慮したジョブスケジューリング

 データストリーム処理のアプリケーションでは、Aggregate(条件にマッチするまでメモリ上に蓄えられる) や Join (複数ストリームの待ち合わせ) など、オンメモリ上にデータを蓄えて処理するような処理が必要になるものがある。このとき、Split オペレータなどによってデータのある属性のハッシュ値を元にデータを並列ノードに分散させる場合、データの偏りが生じ、物理メモリを使い果たす場合がある。例えば、CDR 処理においては、ある特定のユーザーの通話履歴が極端に多い場合、そのユーザーのハッシュ値を担当する物理ホストのメモリ量では賄い切れなくなる。このような問題に対応する為、実行時にデータの偏りとメモリ使用量を定期的に監視し、負荷の均衡が保たれなくなった場合には、負荷を実行時に動的に均等にする機構が必要となる。

 実行時に動的に均衡を適正に保つ上で、いくつかの条件を満たす必要があるが、その例を次に示す。

- 溜めていたデータはロスしてはならない。すべての溜めていたデータは正しくマイグレーション先に送られる。マイグレーション先への負荷を最小限に留める。

- データは留め止めなく流れてくるため、上流のオペレータのバッファで滞留しないように、瞬間的にマイグレーションできなければならない

2010年7月14日水曜日

新たなデータストリーム処理系を作る動機

 今年後半から、研究室発のデータストリーム処理系を設計及び開発していこうと思いますので、そろそろ概要設計ぐらいには取り掛かりたいと思います。

 処理系の実装の動機は「System S のランタイム周りの制約から離れたい」というのが大まかな動機です。例えば、現在感じているだけでも以下のような制約があります。

(1) ランタイムの制御、特にスケジューラ周りに手を入れられなく、その周辺の研究に手を出しにくい。たとえば、今のスケジューラは CPU 使用率しか見ていませんが、メモリ使用率やリソース(データベースなど)の局所性を生かした作りになっていません. オペレータの最適配置問題を最適化問題に落として解こうと思っても、その結果をランタイムに反映できません

(2) 高信頼性(つまり必ずデータロスはない)を実現するデータストリーム処理系を実装するには、現在は、System S の外でその機構を作らないといけません。ランタイム自身にその機構を加えることができるはずです。

(3) 鈴村の SYSTOR 2010 で発表した論文では、入力データの自動分散化がスケーラビリティを出すには重要と書きましたが、System S でも強引にやればできますが、自然な形でランタイムでその機構が作れるはずです。

(4) システム自身が様々なパッケージに依存しており、「Write once, run anywhere」 ではありません. Amazon 上で動かしたり、TSUBAME などの巨大クラスタ上でスケーラビリティテストをやりたくてもできません。

 実装言語は Javaにします. 性能は多少犠牲にしますが、プラットフォーム独立性や開発生産性およびメンテナンス性を考え, Java で実装します。特に金融のようなマイクロ秒で勝負するような処理系を作ることを目的にしていないため、レイテンシなりスループットが少し下がっても問題ないでしょう。また、GPU との親和性は当然悪くなり、JNI 経由呼び出しになります。その代わり、Python や Ruby, PHP と言ったスクリプト言語の処理系も JVM ベースのものが最近はあるため、UDOP に相当するようなユーザ定義オペレータをそれらのスクリプト言語でも記述することも当然可能になります。

ただ、最後に断っておきますが、当然、System S も併用していく予定です。ストリームマイニングやアルゴリズム周りの研究、そして GPU との統合は System S 上でも可能なはずです。

2010年5月29日土曜日

StreamScale: データストリーム処理系の実装

 そろそろ、うちの研究室でのデータストリーム処理系の設計を始めようと思います。やはり、コンポーネント化が重要であるので、OSGi のバンドル機構を使ってどのように実装していくかを調査していきましょう。

OSGi は Eclipse のプラグインなどに使われていますが、Spring, JBOSS, WebLogic, GlassFish, WebSphere Application Server v7.0 などのサーバーサイドでも使われだしています。以下は、WAS v7.0 の OSGi フレームワークの使われ方を開発したものです。チェックして見て下さい。


2010年5月20日木曜日

[Data Distribution / StreamScale] データストリーム処理におけるデータ分散最適化

来週からイスラエルで始まる 国際会議 SYSTOR 2010 では、ETL (Extract-Transform-Load) 処理にデータストリーム処理をした際の様々な性能解析と性能最適化に関する論文を発表します。

さて、この研究の続きとして以下のことを考えています。

SYSTOR の論文では、データを計算側に送る供給量が並列環境では十分ではなく、計算側のノード数、コア数を増加させたときにスケーラビリティが得られないという問題点をあげています。その解決策として、データの供給側を増やすことによって、リニアな性能向上を達成できたというのがこの研究の貢献です。この論文を受けて、以下の研究テーマを考えています。

1. 自動最適化: 先行研究では、供給側の増加は、供給側の分割数を試行錯誤しながら行いましたが、自動的に最適な分割数を探索し、かつプログラム変換によって自動最適化を行う手法を考えます。また、計算側のオペレータは、事前にプログラマによって SPLIT などによって、並列化していることを前提としていますが、開発者は論理的にデータ読み込みと計算部分を並列化を意識せずに開発し、最適化機構によって、ソース数と計算オペレータ数を最適化するようなプログラム変換を施し最高性能を得ます。

2. 実行時自動最適化: 1 は実行時前の最適化ですが、実行時にプロファイリングしながら最適化することも考えられます。こちらは難易度が高いですし、現行の System S ではできないでしょう。本研究室でデータストリーム処理系を作るとしたら、まず、このような最適化ができるような仕組みを作っておく必要があります。