1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| import java.util.Random;
class WorkerThread extends Thread { private final Channel channel;
WorkerThread(String name, Channel channel) { super(name); this.channel = channel; }
@Override public void run() { while(true) { Request request = channel.takeRequest(); request.execute(); } } }
class Channel { private static final int MAX_REQUEST = 100; private final Request[] requestQueue; private int tail; private int head; private int count;
private final WorkerThread[] threadPool;
Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0;
threadPool = new WorkerThread[threads];
for(int i = 0; i < threadPool.length; i++) { threadPool[i] = new WorkerThread("Worker-" + i, this); } }
void startWorkers() { for(int i = 0; i < threadPool.length; i++) { threadPool[i].start(); } }
synchronized void putRequest(Request request) { while(count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); }
synchronized Request takeRequest() { while(count <= 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Request request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } }
class Request { private final String name; private final int number; private static final Random random = new Random();
Request(String name, int number) { this.name = name; this.number = number; }
void execute() { System.out.println(Thread.currentThread().getName() + " executes " + this); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } }
public String toString() { return "{ Request from " + name + " No." + number + " }"; } }
class ClientThread extends Thread { private final Channel channel; private static final Random random = new Random();
ClientThread(String name, Channel channel) { super(name); this.channel = channel; }
@Override public void run() { try { for(int i = 0; true; i++) { Request request = new Request(getName(), i); channel.putRequest(request); Thread.sleep(random.nextInt(1000)); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public class Test { public static void main(String[] args) { Channel channel = new Channel(5); channel.startWorkers(); new ClientThread("Alice", channel).start(); new ClientThread("Bobby", channel).start(); new ClientThread("Chris", channel).start(); } }
|