プロセス間の同期・通信

Menu Menu

Java のスレッドを用いた通信と同期について調べる。

この実験では、Java 1.5 を用いる。


共有資源と競争状態

資源を共有して、作業する二つのスレッドを定義しよう。

以下のSimpleWorker は、共有資源のintをcount 回分、incrementする。Race Condition (競合状態) を明示的に起こすために yield(); を挿入しているが、これは、必須ではない。

Class ProcessExample.SimpleWorker

    package ProcessExample;
    public class SimpleWorker extends Thread {
	    String name;
	    int count;
	    int myResource;
	    SimpleWorker sharedResource;
	    
	    SimpleWorker() {
		    // empty
	    }
	    
	    public SimpleWorker(String _name,int count_,SimpleWorker w) {
		    String msg;
		    name = _name; sharedResource = w;
		    count = count_;
		    msg = "Thread "+name+" created.";
		    myResource = 0;
		    System.out.println(msg);
	    }
	    
	    public void run() {
		    while(count < 0 || count-- > 0) {
			    sharedResource.work();
		    }
	    }
	    public void work() {
		    int x;
		    yield();
		    x = 	myResource;
		    yield();
		    x = x + 1;
		    yield();
		    myResource = x;
		    yield();
	    }
	    
	    public void setResource(int x) {
		    myResource = x;
	    }
	    
	    public int getResource() {
		    return myResource;
	    }
	    
    }

あとで、このクラスを拡張するために、単純Constructorと、資源へのアクセサ (accessor method) が用意されている。

Class ThreadRace では、SimpeWorker の一つ(t1)を受動的な資源として用意し、t2, t3 から平行的にアクセスする。

Class ProcessExample.ThreadRace

    package ProcessExample;
    public class ThreadRace {
	    
	    static public void main(String [] args) throws InterruptedException {
	    
		    SimpleWorker t1 = new SimpleWorker("t1",-1,null); // does not run
		    SimpleWorker t2 = new SimpleWorker("t2",5,t1);
		    SimpleWorker t3 = new SimpleWorker("t3",5,t1);
       
		    t2.start();
		    t3.start();
		    t2.join(0);
		    t3.join(0);
		    String msg = "Thread t1 "+Integer.toString(t1.getResource());
		    System.out.println(msg);
		    
	    }
    }

join(0) では、対応するスレッドの終了を待っている。

このmain() メソッドを動かして、共有資源の値がどうなるかを調べよ。yield() を取り去った場合の動作はどうか?


相互排除

相互排除を実現するためには、そのためのハードウェア的なサポートが必要である。そのための基本的な命令が機械語レベルで用意されていることもある。

Java 言語では、synchronized という構文が用意されており、メソッド単位、ステートメント単位で、モニタを定義することが出来る。モニタを実現するのは、Java VM の役割である。

synchronized で得られるモニタは、インスタンスに対応して一つであり、他のスレッドがモニタを使用している場合は、そのスレッドがモニタを解放するまで待つことになる。

SimpleWorkerのmethod、あるいは、ステートメントに synchronized 構文を付加して、競合状態を避け、正しい結果が得られるように変更せよ。


モニタの及ぶ範囲

以下の例題は、何故か、うまく競合条件を避けるように動作しない。ThreadRace の初期化を

    SimpleWorker t1 = new BadWorker("t1",-1,null); // does not run
    SimpleWorker t2 = new BadWorker("t2",5,t1);
    SimpleWorker t3 = new BadWorker("t3",5,t1);

に変更して動作させよう。(BadWorker t1 である必要はない。何故か?)

Class ProcessExample.BadWorker

    package ProcessExample;
    public class BadWorker extends SimpleWorker {
	    public BadWorker(String _name,int count_,SimpleWorker w) {
		    String msg;
		    name = _name; sharedResource = w;
		    count = count_;
		    msg = "Thread "+name+" created.";
		    myResource = 0;
		    System.out.println(msg);
	    }
	    
	    public void run() {
		    while(count < 0 || count-- > 0) {
			    work();
		    }
	    }
	    
	    public synchronized void work() {
		    int x;
		    yield();
		    x = sharedResource.getResource();
		    yield();
		    x = x + 1;
		    yield();
		    sharedResource.setResource(x);
		    yield();
	    }
    }

これは何故か? モニタとオブジェクトの対応を明確にする図を書いて説明せよ。


生産者消費者問題

生産者消費者問題では、生産者(OurProducer), 消費者(OurConsumer)と、その間を調停するモニタ(OurThreadMonitor) の三つのスレッドからなる。モニタは、ここでは受動的にしか動かないので、start()する必然性はない。(が、一応、起動しておこう....)

Class ProcessExample.ThreadProdCons

    package ProcessExample;
    public class ThreadProdCons {
		    
	    static public void main(String [] args) throws InterruptedException {
		    OurThreadMonitor monitor = new OurThreadMonitor("m1",5);
		    Thread p1 = new OurProducer("p1",10,monitor);
		    Thread c1 = new OurConsumer("c1",monitor);
		    
		    monitor.start();
		    p1.start();
		    c1.start();
		    p1.join();
		    
		    c1.stop();
		    monitor.stop();
	    }
    }

ここで、5 は、モニタが持つバッファの大きさであり、Producer の10は、生産するデータの個数である。生産者の終了を待って、すべてのスレッドを停止させている。

Class ProcessExample.OurProducer

    package ProcessExample;
    public class OurProducer extends Thread {
	    OurThreadMonitor mon;
	    String name;
	    int counter;
	    
	    OurProducer(String name_,int counter_, OurThreadMonitor mon_) {
		    mon = mon_;
		    name = name_;
		    counter = counter_;
	    }
	    
	public void run() { // run method contains the thread code 
	    int item; 
	    while (counter>=0) { // producer loop 
		item = produce_item(); 
		mon.insert(item); 
	    } 
	} 
	private int produce_item() {  return counter--; } // actually produce 
    }

生産者は、モニタに対して、insert(item) でデータを引き渡す。

Class ProcessExample.OurConsumer

    package ProcessExample;
    public class OurConsumer extends Thread {
	    OurThreadMonitor mon;
	    String name;
	    
	    OurConsumer(String name_, OurThreadMonitor mon_) {
		    mon = mon_;
		    name = name_;
	    }
	public void run() { // run method contains the thread code 
	    int item; 
	    while (true) { // consumer loop 
		item = mon.remove(); 
		consume_item (item); 
	    } 
	}
	private void consume_item(int item) {
		    String msg = "Consumer "+name+" eats "+Integer.toString(item);
		    System.out.println(msg);
	    
	} // actually consume 
    }

消費者は、モニタから remove() でデータを取得する。

モニタは、計数セマフォとして動作する。条件付変数の代わりに、wait()/notify()を使っていることに注意しよう。

Class ProcessExample.OurThreadMonitor

    package ProcessExample;
    public class OurThreadMonitor extends Thread {
	    private int bufsize;
	private int buffer[];
	private int count, lo, hi; // counters and indices
	private String name;
	
	OurThreadMonitor(String _name,int _bufsize) {
		    name = _name;
		    bufsize = _bufsize;
		    buffer = new int[bufsize];
	}
	
	public synchronized void insert(int val) { 
	    if (count == bufsize) go_to_sleep(); // if the buffer is full, go to sleep 
	    buffer [hi] = val; // insert an item into the buffer 
	    hi = (hi + 1) % bufsize; // slot to place next item in 
	    count = count + 1; // one more item in the buffer now 
	    if (count == 1) notify(); // if consumer was sleeping, wake it up 
	} 
	public synchronized int remove() { 
	    int val; 
	    if (count == 0) go_to_sleep( ); // if the buffer is empty, go to sleep 
	    val = buffer [lo]; // fetch an item from the buffer 
	    lo = (lo + 1) % bufsize; // slot to fetch next item from 
	    count = count - 1 ; // one few items in the buffer 
	    if (count == bufsize - 1) notify(); // if producer was sleeping, wake it up
	    return val; 
	} 
	private void go_to_sleep() { try{wait( );} catch(InterruptedException exc
    ) {};} 
    }

このsynchronizedでは、OurThreadMonitorのインスタンスに対応するモニタを取得する。OurThreadMonitor がbuffer fullまたはbuffer emptyになると、insert/remove ともにブロックされることになる。実際にそういうことが起きるかどうかを調べよ。もし、起きるなら、これはバグであるので、fix せよ。起きないならば、その理由を、Java Language API の記述を引用して説明せよ。

ThreadProdCons は、生産者スレッドが終了すると、無条件に消費者を殺しにいく。これは、若干、拙速な行動である。消費者がデータを取り損なうことなく、終了するためには、どうすれば良いか? 可能な変更を提案し、実装せよ。


複数の生産者と複数の消費者

課題3では、生産者と消費者プロセスがそれぞれ1つづつしか生成されない。これを変更して、複数の生産者プロセスと複数の消費者プロセスを生成し、動作させなさい。

生産者プロセスと消費者プロセスの数は異なる場合が普通である。しかし、生産量と消費量がバランスしないとデッドロックに陥る可能性がある。

デッドロックを検出するためには、どうすれば良いか? 可能な方法を提案し、実装せよ。


Shinji KONO / Tue Dec 18 12:39:43 2007