(1)本實驗的多個緩沖區不是環形循環的,也不要求按順序訪問。生產者可以把產品放到目前某壹個空緩沖區中。
(2)消費者只消費指定生產者的產品。
(3)在測試用例文件中指定了所有的生產和消費的需求,只有當***享緩沖區的數據滿足了所有關於它的消費需求後,此***享緩沖區才可以作為空閑空間允許新的生產者使用。
(4)本實驗在為生產者分配緩沖區時各生產者間必須互斥,此後各個生產者的具體生產活動可以並發。而消費者之間只有在對同壹產品進行消費時才需要互斥,同時它們在消費過程結束時需要判斷該消費對象是否已經消費完畢並清除該產品。
Windows 用來實現同步和互斥的實體。在Windows 中,常見的同步對象有:信號量(Semaphore)、
互斥量(Mutex)、臨界段(CriticalSection)和事件(Event)等。本程序中用到了前三個。使用這些對象都分
為三個步驟,壹是創建或者初始化:接著請求該同步對象,隨即進入臨界區,這壹步對應於互斥量的
上鎖;最後釋放該同步對象,這對應於互斥量的解鎖。這些同步對象在壹個線程中創建,在其他線程
中都可以使用,從而實現同步互斥。當然,在進程間使用這些同步對象實現同步的方法是類似的。
1.用鎖操作原語實現互斥
為解決進程互斥進人臨界區的問題,可為每類臨界區設置壹把鎖,該鎖有打開和關閉兩種狀態,進程執行臨界區程序的操作按下列步驟進行:
①關鎖。先檢查鎖的狀態,如為關閉狀態,則等待其打開;如已打開了,則將其關閉,繼續執行步驟②的操作。
②執行臨界區程序。
③開鎖。將鎖打開,退出臨界區。
2.信號量及WAIT,SIGNAL操作原語
信號量的初值可以由系統根據資源情況和使用需要來確定。在初始條件下信號量的指針項可以置為0,表示隊列為空。信號量在使用過程中它的值是可變的,但只能由WAIT,SIGNAL操作來改變。設信號量為S,對S的WAIT操作記為WAIT(S),對它的SIGNAL操作記為SIGNAL(S)。
WAIT(S):順序執行以下兩個動作:
①信號量的值減1,即S=S-1;
②如果S≥0,則該進程繼續執行;
如果 S(0,則把該進程的狀態置為阻塞態,把相應的WAITCB連人該信號量隊列的末尾,並放棄處理機,進行等待(直至其它進程在S上執行SIGNAL操作,把它釋放出來為止)。
SIGNAL(S):順序執行以下兩個動作
①S值加 1,即 S=S+1;
②如果S)0,則該進程繼續運行;
如果S(0則釋放信號量隊列上的第壹個PCB(既信號量指針項所指向的PCB)所對應的進程(把阻塞態改為就緒態),執行SIGNAL操作的進程繼續運行。
在具體實現時註意,WAIT,SIGNAL操作都應作為壹個整體實施,不允許分割或相互穿插執行。也就是說,WAIT,SIGNAL操作各自都好像對應壹條指令,需要不間斷地做下去,否則會造成混亂。
從物理概念上講,信號量S)時,S值表示可用資源的數量。執行壹次WAIT操作意味著請求分配壹個單位資源,因此S值減1;當S<0時,表示已無可用資源,請求者必須等待別的進程釋放了該類資源,它才能運行下去。所以它要排隊。而執行壹次SIGNAL操作意味著釋放壹個單位資源,因此S值加1;若S(0時,表示有某些進程正在等待該資源,因而要把隊列頭上的進程喚醒,釋放資源的進程總是可以運行下去的。
---------------
/**
* 生產者
*
*/
public class Producer implements Runnable{
private Semaphore mutex,full,empty;
private Buffer buf;
String name;
public Producer(String name,Semaphore mutex,Semaphore full,Semaphore empty,Buffer buf){
this.mutex = mutex;
this.full = full;
this.empty = empty;
this.buf = buf;
this.name = name;
}
public void run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+" inserts a new product into "+buf.nextEmptyIndex);
buf.nextEmptyIndex = (buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
---------------
/**
* 消費者
*
*/
public class Customer implements Runnable{
private Semaphore mutex,full,empty;
private Buffer buf;
String name;
public Customer(String name,Semaphore mutex,Semaphore full,Semaphore empty,Buffer buf){
this.mutex = mutex;
this.full = full;
this.empty = empty;
this.buf = buf;
this.name = name;
}
public void run(){
while(true){
full.p();
mutex.p();
System.out.println(name+" gets a product from "+buf.nextFullIndex);
buf.nextFullIndex = (buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
-------------------------
/**
* 緩沖區
*
*/
public class Buffer{
public Buffer(int size,int nextEmpty,int nextFull){
this.nextEmptyIndex = nextEmpty;
this.nextFullIndex = nextFull;
this.size = size;
}
public int size;
public int nextEmptyIndex;
public int nextFullIndex;
}
-----------------
/**
* 此類用來模擬信號量
*
*/
public class Semaphore{
private int semValue;
public Semaphore(int semValue){
this.semValue = semValue;
}
public synchronized void p(){
semValue--;
if(semValue<0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void v(){
semValue++;
if(semValue<=0){
this.notify();
}
}
}
------------------------
public class Test extends Thread
{
public static void main(String[] args)
{
Buffer bf=new Buffer(10,0,0);
Semaphore mutex=new Semaphore(1);
Semaphore full=new Semaphore(0);
Semaphore empty=new Semaphore(10);
//new Thread(new Producer("p001",mutex,full,empty,bf)).start();
Producer p=new Producer("p001",mutex,full,empty,bf);
new Thread(new Producer("p002",mutex,full,empty,bf)).start();
new Thread(new Producer("p003",mutex,full,empty,bf)).start();
new Thread(new Producer("p004",mutex,full,empty,bf)).start();
new Thread(new Producer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception ex)
{
ex.printStackTrace();
}
new Thread(new Customer("c001",mutex,full,empty,bf)).start();
new Thread(new Customer("c002",mutex,full,empty,bf)).start();
new Thread(new Customer("c003",mutex,full,empty,bf)).start();
new Thread(new Customer("c004",mutex,full,empty,bf)).start();
new Thread(new Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------