본문 바로가기
CS/운영체제

병행 프로세스의 여러가지 문제들, 생산자-소비자, 판독기-기록기, 자바 구현 코드, 프로세스 간 통신

by Renechoi 2023. 6. 15.

1. 생산자-소비자 문제 

 

- 두 협력 프로세스 사이에 버퍼를 두고 생산자와 소비자의 상황을 다루는 문제

- 생산자: 데이터를 넣느 프로세스

- 소비자: 데이터를 꺼내는 프로세스 

 

버퍼에 여러 프로세스가 동시에 접근할 수 없음

- 버퍼에 데이터를 넣는 동안에는 데이터를 꺼낼 수 없음

- 버퍼에서 데이터를 꺼내는 동안에는 데이터를 넣을 수 없음 

-> 상호배제 필요 

 

버퍼의 크기가 유한 (유한 버퍼 문제) 

- 버퍼가 가득 찬 경우 생산자는 대기해야 함

- 버퍼가 빈 경우 소비자는 대기해야 함 

-> 동기화 필요 

 

 

상호배제: 세마포어 mutex를 이용 (초깃값 1) 

 

 

 

버퍼가 가득 차 경우 동기화: 세마포어 empty(초깃값 n) // n: 버퍼 크기 

 

 

 

 

버퍼가 빈 경우 동기화: 세마포어 full (초깃값 0) 

 

 

 

따라서 3개의 세마포어를 갖고 해결한다.

- mutex(1), empty(n), full(0) 

 

 

 

생산자 입장에서는 empty 여부를 확인해야 하는 것이고 소비자 입장에서는 full 여부를 확인해야 하는 것이다. 

 

 

자바 구현 코드

 

package os.semaphore.producerandconsumer;

import os.semaphore.Semaphore;

class Producer extends Thread {
    private Semaphore mutex;
    private Semaphore empty;
    private Semaphore full;
    private int id;

    public Producer(Semaphore mutex, Semaphore empty, Semaphore full, int id) {
        this.mutex = mutex;
        this.empty = empty;
        this.full = full;
        this.id = id;
    }

    public void run() {
        try {
            while (true) {
                // 생산 전에 빈 공간을 확인
                empty.acquire();
                mutex.acquire();

                // 생산 작업 수행
                System.out.println("생산자 " + id + "가 아이템을 생산합니다.");

                mutex.release();
                full.release();
                Thread.sleep(1000); // 임의의 작업 시간
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    private Semaphore mutex;
    private Semaphore empty;
    private Semaphore full;
    private int id;

    public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, int id) {
        this.mutex = mutex;
        this.empty = empty;
        this.full = full;
        this.id = id;
    }

    public void run() {
        try {
            while (true) {
                // 소비 전에 채워진 아이템을 확인
                full.acquire();
                mutex.acquire();

                // 소비 작업 수행
                System.out.println("소비자 " + id + "가 아이템을 소비합니다.");

                mutex.release();
                empty.release();
                Thread.sleep(2000); // 임의의 작업 시간
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        Semaphore mutex = new Semaphore(1); // 상호 배제를 위한 mutex 세마포어
        Semaphore empty = new Semaphore(5); // 비어있는 슬롯 개수를 추적하는 empty 세마포어
        Semaphore full = new Semaphore(0); // 채워진 슬롯 개수를 추적하는 full 세마포어

        // 생산자와 소비자 생성
        Producer producer = new Producer(mutex, empty, full, 1);
        Consumer consumer = new Consumer(mutex, empty, full, 1);

        // 생산자와 소비자 실행
        producer.start();
        consumer.start();

        // 메인 스레드는 생산자와 소비자가 종료될 때까지 대기
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

Producer 클래스와 Consumer 클래스는 Thread를 상속받아 각각 생산자와 소비자 역할을 수행한다.

각각 mutex, empty, full 세마포어를 공유하며, 이를 통해 상호 배제 및 동기화 문제를 해결한다.


- mutex: 상호 배제를 위한 세마포어. 공유 자원인 버퍼에 동시에 접근하는 것을 제어. 크기가 1로 설정하였기 때문에 동시에 한 스레드만 버퍼에 접근할 수 있다.
- empty: 비어있는 슬롯의 개수를 추적하기 위한 세마포어. 생산자가 아이템을 생산하기 전에 빈 공간이 있는지 확인. 초깃값은 5로 설정되어 있어서 5개의 빈 슬롯을 나타낸다.
- full: 채워진 슬롯의 개수를 추적하기 위한 세마포어. 소비자가 아이템을 소비하기 전에 채워진 슬롯이 있는지 확인. 초깃값은 0으로 설정되어 있어서 채워진 슬롯이 없음을 나타낸다.

 

각각의 생산자와 소비자 스레드는 무한 루프를 실행한다.  생산자는 빈 공간(empty)을 확인한 후 mutex를 획득하고 아이템을 생산한 뒤 mutex와 full 세마포어를 해제한다. 소비자는 채워진 슬롯(full)을 확인한 후 mutex를 획득하고 아이템을 소비한 뒤 mutex와 empty 세마포어를 해제한다. 

 

 

 

 

 

2. 판독기-기록기 문제 

 

 

여러 협력 프로세스 사이에 공유자원을 두고 판독기와 기록기의 상황을 다루는 문제 

 

- 판독기: 읽기 프로세스

- 기록기: 쓰기 프로세스 

 

* 생산자 소비자의 경우 둘다 데이터에 대한 수정이 발생하게 되므로 기록기라고 보아야 한다. 

 

하나의 기록기가 공유자원에 데이터를 쓰는 중에는 다른 기록기나 판독기는 공유자원에 접근할 수 없다. 따라서 다음 두 가지의 오구사항이 있다.

- 공유자원에 데이터를 쓰는 동안에는 누구도 접근할 수 없음

- 공유자원에서 데이터를 읽는 동안에는 데이터를 쓸수 없음 

 

즉, 상호배제 필요. 

 

그런데 여러 판독기는 동시에 공유자원에서 데이터를 읽을 수도 있다. 따라서 조건을 보다 더 세분화해주어야 한다. 

 

i) 판독기가 읽는 중 새로운 판독기 읽기 시도 -> 가능 

ii) 판독기가 읽는 중 기록기가 대기하고 있을 때 -> 새로운 판독기가 읽기를 시도한다면 -> 가능 or 불가능 

 

ii)에 대한 결정은 정책적 결정에 따르는 것으로 상황에 따라 다르게 적용할 수 있다. 

 

 

 

제 1판독기 문제

- 판독기에 우선순위를 주는 케이스

- 즉, 새로운 판독기는 즉시 공유자원에 접근 가능하다. 

- 문제점 : 기록기의 기아 상태 유발 가능

-> 판독기가 계속 들어오면 기록기는 계속 기다려야만 할 수 있다. 

 

일반변수 rCount(초깃값 0)과 세마포어 mutex(초깃값 1)을 설정한다. 

 

 

 

판독기가 읽으면 ++; 

-> 보다 큰 값으로 올린다.

-> 다른 판독기는 그냥 통과해서 읽을 수 있게 된다. 

-> 후에 v 연산으로 상호배제 자원을 해제하면

-> v 연산을 통해 기록기가 들어 올 수 있게 된다. 

 

이때 rCount = rCount +1;와 p(wrt) 하는 부분도 하나의 임계 영역을 보아야 한다. 

동시 쓰레드가 진행하던 중에 rcount++;이 반영되기도 전 시점에 if를 읽어서 동시성 문제가 발생할 수 있다. 

 

 

 

 

자바 구현 코드 

 

package os.semaphore.readersandwriters.no1;

import os.semaphore.Semaphore;

class SharedResource {
    private Semaphore wrt;
    private Semaphore mutex;
    private int rCount;

    public SharedResource() {
        wrt = new Semaphore(1);
        mutex = new Semaphore(1);
        rCount = 0;
    }

    public void writeData() {
        try {
            wrt.acquire();

            // 공유 자원에 데이터 쓰기
            System.out.println("기록기가 공유 자원에 데이터를 씁니다.");

            wrt.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * rCount를 설정하는 부분을 임계영역으로 설정하는 이유:
     * rCount++;를 통해 1을 증가시키지만, 스레드가 동시적으로 수행되면서 "==1" 을 거치기 전에 바로 2가 되어 버리는 경우가 생길 수 있다.
     * 따라서 rCount 변수를 설정하고 if 문을 수행하는 과정은 판독기 스레드들이 공유 자원에 접근하기 전에 실행되어야 한다.
     * mutex.acquire()를 호출하여 임계 영역을 설정하여
     * rCount 변수와 if 문이 원자적(atomic)으로 실행되고 다른 스레드에 의해 방해받지 않는 환경을 보장한다.
     */

    public void readData() {
        try {
            mutex.acquire();
            rCount++;
            if (rCount == 1) {
                wrt.acquire();
            }
            mutex.release();

            // 공유 자원에서 데이터 읽기
            System.out.println("판독기가 공유 자원에서 데이터를 읽습니다.");

            mutex.acquire();
            rCount--;
            if (rCount == 0) {
                wrt.release();
            }
            mutex.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Writer extends Thread {
    private SharedResource sharedResource;

    public Writer(SharedResource sharedResource) {
        this.sharedResource = sharedResource;
    }

    public void run() {
        sharedResource.writeData();
    }
}

class Reader extends Thread {
    private SharedResource sharedResource;

    public Reader(SharedResource sharedResource) {
        this.sharedResource = sharedResource;
    }

    public void run() {
        sharedResource.readData();
    }
}

public class ReaderWriter {
    public static void main(String[] args) {
        SharedResource sharedResource = new SharedResource();

        // 기록기 생성 및 실행
        Writer writer = new Writer(sharedResource);
        writer.start();

        // 판독기 생성 및 실행
        Reader reader1 = new Reader(sharedResource);
        Reader reader2 = new Reader(sharedResource);
        reader1.start();
        reader2.start();

        // 메인 스레드는 모든 스레드가 종료될 때까지 대기
        try {
            writer.join();
            reader1.join();
            reader2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

- wrt: 상호 배제를 위한 세마포어로, 기록 작업을 수행하는 동안 다른 스레드의 접근을 막는다. 초깃값은 1로 설정되어 한 번에 한 개의 기록 작업만 허용.

- mutex: 상호 배제를 위한 세마포어로, 판독자의 rCount 변수를 조작하는 과정을 임계 영역으로 설정하여 동시 접근 문제를 해결. 초깃값은 1로 설정하여 한 번에 한 스레드만 mutex를 획득할 수 있다.
- rCount: 판독자 스레드의 수를 추적하는 변수로, 판독자 스레드들이 공유 자원에 접근하는 동안의 동기화를 담당. 초깃값은 0으로 설정.

 

판독자(Reader)는 SharedResource의 readData() 메서드를 호출하여 공유 자원에서 데이터를 판독한다. 판독 작업을 수행하기 전에 mutex를 획득하고 rCount 변수를 조작하여 판독자의 수를 추적한다. rCount가 1인 경우, 기록자가 접근하지 않도록 wrt 세마포어를 획득. 데이터를 판독한 후에는 다시 mutex를 획득하여 rCount 변수를 조작하고, rCount가 0인 경우에만 wrt 세마포어를 해제한다.

 

 

 

 

 

 

제 2 판독기-기록기 문제

-> 기록기에 우선순위를 주는 경우 

-> 즉, 대기 중인 기록기가 있다면 새로운 판독기는 공유자원에 접근이 불가능하다. 

 

문제점

-> 판독기의 병행성이 떨어진다.

-> 마찬가지로 판독기의 기아 상태 유발 가능성 

 

 

총 5개의 세마포어를 이용한다. 

 

자바 구현 코드 

 

package os.semaphore.readersandwriters.no2;

import os.semaphore.Semaphore;

class SharedResource {
    private Semaphore rd;
    private Semaphore wrt;
    private Semaphore mutex1;
    private Semaphore mutex2;
    private Semaphore mutex3;
    private int rCount;
    private int wCount;

    public SharedResource() {
        rd = new Semaphore(1);
        wrt = new Semaphore(1);
        mutex1 = new Semaphore(1);
        mutex2 = new Semaphore(1);
        mutex3 = new Semaphore(1);
        rCount = 0;
        wCount = 0;
    }

    public void writeData() {
        try {
            mutex2.acquire();
            wCount++;
            if (wCount == 1) {
                rd.acquire();
            }
            mutex2.release();

            wrt.acquire();

            // 공유 자원에 데이터 쓰기
            System.out.println("기록기가 공유 자원에 데이터를 씁니다.");

            wrt.release();

            mutex2.acquire();
            wCount--;
            if (wCount == 0) {
                rd.release();
            }
            mutex2.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void readData() {
        try {
            mutex3.acquire();
            rd.acquire();
            mutex1.acquire();
            rCount++;
            if (rCount == 1) {
                wrt.acquire();
            }
            mutex1.release();
            rd.release();
            mutex3.release();

            // 공유 자원에서 데이터 읽기
            System.out.println("판독기가 공유 자원에서 데이터를 읽습니다.");

            mutex1.acquire();
            rCount--;
            if (rCount == 0) {
                wrt.release();
            }
            mutex1.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Writer extends Thread {
    private SharedResource sharedResource;

    public Writer(SharedResource sharedResource) {
        this.sharedResource = sharedResource;
    }

    public void run() {
        sharedResource.writeData();
    }
}

class Reader extends Thread {
    private SharedResource sharedResource;

    public Reader(SharedResource sharedResource) {
        this.sharedResource = sharedResource;
    }

    public void run() {
        sharedResource.readData();
    }
}

public class ReaderWriter {
    public static void main(String[] args) {
        SharedResource sharedResource = new SharedResource();

        // 기록기 생성 및 실행
        Writer writer = new Writer(sharedResource);
        writer.start();

        // 판독기 생성 및 실행
        Reader reader1 = new Reader(sharedResource);
        Reader reader2 = new Reader(sharedResource);
        reader1.start();
        reader2.start();

        // 메인 스레드는 모든 스레드가 종료될 때까지 대기
        try {
            writer.join();
            reader1.join();
            reader2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

 

 

3. 프로세스 간 통신 

 

 

 

- InterProcess Communication 

- 병행 프로세스가 데이터를 서로 공유하는 방법

  -> 공유 메모리 방법

  -> 메시지 전달 방법 

- 하나의 운영체제에서 두 방법을 함께 사용할 수 있다. 

 

 

 

공유 메모리 방법

- 협력 프로세스가 동일한 변수를 사용

 

예시) 

- 생산자-소비자 문제의 유한 버퍼

- 판독기-기록기 문제의 공유자원 

 

- 대량 데이터 교환: 고속 통신 가능

- 통신상 발생 가능 문제 해결: 응용 프로그래머 역할 

 

메시지 전달 방법 

- 협력 프로세스가 메시지를 주도 받는다: send(), receive(), 시스템 호출 

- 소량 데이터 교환에 적합

- 통신상 문제 해결: 운영체제 역할 

 

 

 

 

 

메시지 전달 방법의 논리적 구조 

 

통신링크 

- 메시지가 지나다니는 통로 

- 연결 대상: 두 프로세스, 세 프로세스 

- 두 프로세스 사이 링크 개수: 하나, 둘 이상

- 방향성: 단방향, 양방향

- 용량: 무한, 유한, 0

 

 

용량 무한 

=> 송신자는 대기 없음 

 

용량 유한 

=> 송신자는 큐가 가득 차면 대기 

 

용량 0 

=> 송신자는 수신자가 메시지를 직접 받을 수 있을 때까지 대기 

 

 

직접 통신: 두 프로세스가 직접 서로를 지정하여 메시지 전달 

 

 

- 오직 하나의 통신 링크가 자동 설정

- 하나의 통신 링크는 오직 두 프로세스 사이에만 연관

- 통신 링크는 양방향 

 

 

- 대칭형 주소 지정 

  -> send(B, m) <-> receive(A, m); 

 

- 비대칭형 주소 지정 

  -> send(B, m) <-> receive(id,m)

  -> 누구한테 받을지 모름 

  -> 수신자가 여러 송신자와 통신 링크를 갖는 경우 사용 

 

 

간접 통신

- 프로세스 사이에 우편함을 통해 메시지 전달 

- send(X, m) <-> receive(X, m); 

 

- 같은 우편함을 이용하는 경우 통신 링크 설정

- 여러 우편함을 이용하면 여러 개의 통신 링크 존재

- 하나의 통신 링크가 여러 프로세스와 연관 가능

- 통신 링크는 단방향 또는 양방향 

 

 

우편함이 수신 프로세스에 소속하는 경우

- 수신자 하나

- 통신링크 단방향 

- 수신 프로세스가 종료시 우편함도 사라짐 

 

우편함이 운영체제에 소속되는 경우

- 수신자 여럿

- 한순간에 하나의 수신자만 가능

- 운영체제가 수신자 관리

- 통신 링크 양방향 

 

 

 


참고자료: 운영체제(김진욱, 이인복 공저, KNOU press 출판) 

반응형