PART.1 无法中断的线程
一个无法中断的线程的例子。
- public class UninterruptableThread
- {
- @SuppressWarnings("deprecation")
- public static void main(String[] args) throws Exception
- {
- Thread th = new Thread(new TestRunnable());
- // 启动线程
- System.out.println("main: start thread.");
- th.start();
- // 等待2秒
- Thread.sleep(2000);
- // 中断线程
- System.out.println("main: interrupt thread.");
- th.interrupt();
- // 等待2秒
- Thread.sleep(2000);
- // 停止线程
- System.out.println("main: stop thread.");
- th.stop();
- }
- private static class TestRunnable implements Runnable
- {
- @Override
- public void run()
- {
- System.out.println("Thread started.");
- while (true)
- {
- // 避免由于while(true)导致编译失败。
- if (false) break;
- }
- // 清理工作
- System.out.println("Thread ended.");
- }
- }
- }
- 输出结果:
main: start thread.Thread started.main: interrupt thread.main: stop thread.
- Thread对象的interrupt方法仅仅把该线程的一个标志置为true,该方法本身并不包含任何中断线程的操作。
- stop方法可以将线程中止,但通过输出的结果可以发现,”Thread ended”并没有被输出,即线程本身不能进行任何清理工作。
PART.2 可中断的线程
- 线程应不断检查isInterrupted是否为true,当其返回true时,应开始清理并结束线程。(Test1中的while循环)
- Thread.sleep方法会在线程被中断时抛出InterruptedException,线程可以捕获该异常并开始清理和结束线程。(Test2中的Thread.sleep())
- 如果循环中不时调用Thread.sleep,可以处理isInterrupted。
可中断线程的例子:
- public class GeneralTest
- {
- public static void main(String[] args) throws Exception
- {
- Thread th1 = new Thread(new Test1());
- Thread th2 = new Thread(new Test2());
- // 启动 Test1 和 Test2 线程。
- System.out.println("main: starting 'Test1' and 'Test2'.");
- th1.start();
- th2.start();
- // 等待3秒。
- System.out.println("main: sleeping for 3 seconds.");
- Thread.sleep(3000);
- // 中断 Test1 和 Test2 线程。
- System.out.println("main: interrupting 'Test1' and 'Test2'.");
- th1.interrupt();
- th2.interrupt();
- // 等待 Test1 和 Test2 线程结束。
- System.out.println("main: waiting for 'Test1' and 'Test2' to end.");
- th1.join();
- th2.join();
- System.out.println("main: end.");
- }
- private static class Test1 implements Runnable
- {
- @Override
- public void run()
- {
- System.out.println("Test1: start.");
- while (!Thread.currentThread().isInterrupted())
- {
- // 其他操作...
- System.out.print("");
- }
- System.out.println("Test1: end.");
- }
- }
- private static class Test2 implements Runnable
- {
- @Override
- public void run()
- {
- System.out.println("Test2: start.");
- try
- {
- while (true)
- {
- // 其他操作...
- Thread.sleep(1000);
- }
- }
- catch (InterruptedException e)
- {
- System.out.println("Test2: InterruptedException");
- }
- System.out.println("Test2: end.");
- }
- }
- }
PART.3 isInterrupted()和interrputed()方法的区别
- isInterrupted方法是实例方法,interrupted方法是静态方法。
Thread.currentThread().isInterrupted()Thread.interrupted()
- interrupted方法在调用之后会将中断标志置为false。在只对线程调用一次interrupt的前提下interrupted方法只会返回一次true。
- 使用interrupted方法判断应确保在判断之后开始结束线程。
isInterrupted和interrupted方法比较的例子:
- public class InterruptedStateTest
- {
- public static void main(String[] args) throws Exception
- {
- // "Test1"
- Thread th1 = new Thread(new Test1());
- // 启动 Test1 线程,并在3秒之后中断该线程。
- th1.start();
- Thread.yield();
- System.out.println("Test1 started... Waiting 3 seconds.");
- Thread.sleep(3000);
- System.out.println("Interrupting Test1...");
- th1.interrupt();
- Thread.sleep(1000);
- System.out.println("---------------------------------------");
- // “Test2"
- Thread th2 = new Thread(new Test2());
- // 启动 Test2 线程,并在3秒之后中断该线程。
- th2.start();
- Thread.yield();
- System.out.println("Test2 started... Waiting 3 seconds.");
- Thread.sleep(3000);
- System.out.println("Interrupting Test2...");
- th2.interrupt();
- Thread.yield();
- // 主线程结束。
- System.out.println("End of main.");
- }
- private static class Test1 implements Runnable
- {
- @Override
- public void run()
- {
- System.out.println("Test1 start...");
- while (true)
- {
- if (Thread.currentThread().isInterrupted())
- {
- if (Thread.currentThread().isInterrupted())
- {
- System.out.println("Interrupted...");
- break;
- }
- }
- }
- System.out.println("Test1 end...");
- }
- }
- private static class Test2 implements Runnable
- {
- @Override
- public void run()
- {
- // 记录线程开始时间。
- long startTime = System.currentTimeMillis();
- System.out.println("Test2 start... " +
- "Automatically ends in 6 sec.");
- while (true)
- {
- // 连续判断2次Thread.interrupted()
- if (Thread.interrupted())
- {
- if (Thread.interrupted())
- {
- System.out.println("Interrupted...");
- break;
- }
- }
- // 如果线程2运行超过6秒将自动结束。
- if (System.currentTimeMillis() - startTime > 6000 )
- {
- System.out.println("5 seconds...");
- break;
- }
- }
- System.out.println("Test2 end");
- }
- }
- }
例子中Test1连续判断2次Thread.currentThread().isInterrupted(), Test1仍然可以正常中断。
- if (Thread.currentThread().isInterrupted())
- {
- if (Thread.currentThread().isInterrupted())
- {
- // 结束线程。
- }
- }
Test2连续判断2次Thread.interrupted(),因此Test2线程在被调用interrupt之后没有结束。
- if (Thread.interrupted())
- {
- if (Thread.interrupted())
- {
- // 结束线程。
- }
- }
PART.4 处理阻塞
阻塞操作如BufferedReader的readLine方法,ServerSocket的accept方法将导致线程不能判断isInterrupted(),因此线程中的阻塞不能永久阻塞。处理阻塞的方法有以下:
- 在调用阻塞方法时设置超时时间:
类 | 方法 | 超时后的处理 |
ReentrantLock ReadLock WriteLock | tryLock(long, TimeUnit) | 返回false |
Condition | await(long, TimeUnit) awaitNanos(long) awaitUntil(Date) | 返回false |
Future | get(long, TimeUnit) | TimeoutException |
CyclicBarrier | await(long, TimeUnit) | TimeoutException |
CountDownLatch | await(long, TimeUnit) | 返回false |
Exchanger | exchange(V, long, TimeUnit) | TimeoutException |
Semaphore | tryAcquire(long, TimeUnit) | 返回false |
BlockingQueue<E> | offer(E, long, TimeUnit) poll(long, TimeUnit) | 返回false 返回null |
BlockingDeque<E> | offerFirst(E, long, TimeUnit) offerLast(E, long, TimeUnit) poolFirst(long, TimeUnit) poolLast(long, TimeUnit) | 返回false
返回null |
ServerSocket | accept() 通过setSoTimeout设置超时时间。 | SocketTimeoutException |
- 该方法在阻塞时如果线程被中断,可以抛出一个异常:
类 | 方法 | 异常 |
Thread | sleep(long) join() | InterruptedException
|
ReentrantLock ReadLock WriteLock | lockInterruptibly() | InterruptedException |
Condition | await() | InterruptedException |
Future | get() | InterruptedException |
CyclicBarrier | await() | InterruptedException |
CountDownLatch | await() | InterruptedException |
Exchanger | exchange(V) | InterruptedException |
Semaphore | acquire() | InterruptedException |
BlockingQueue<E> | put(E) take() | InterruptedException |
BlockingDeque<E> | putFirst(E) putLast(E) takeFirst(E) takeLast(E) | InterruptedException |
- 调用不可中断阻塞方法的可中断版本:
类 | 阻塞方法 | 可中断方法 |
ReentrantLock ReadLock WriteLock | lock() | tryLock() tryLock(long, TimeUnit) lockInterruptibly() |
Condition | awaitUninterruptibly() | await() await(long, TimeUnit) awaitNanos(long) awaitUntil(Date) |
Semaphore | acquireUninterruptibly() | acquire() tryAcquire() tryAcquire(long, TimeUnit) |
- 不能设置超时也不能抛出异常的阻塞方法:
- synchronized块,Object的wait方法。可以使用ReentrantLock和Condition替代。
- BufferedReader的readLine方法,ObjectInputStream得readObject方法。(如果底层流是通过Socket获得,可以通过Socket设置超时)
PART.5 处理Thread.sleep()
1. 捕获异常并结束线程
- 捕获InterruptedException异常,开始清理操作并结束线程。
- public void run()
- {
- try
- {
- while (true)
- {
- // 需要进行的操作
- Thread.sleep(500);
- }
- }
- catch (InterruptedException e)
- {
- }
- // 清理操作
- }
2. 捕获异常,再次调用interrupt
- public void run()
- {
- while (!Thread.currentThread().isInterrupted())
- {
- try
- {
- // 需要进行的操作 1
- Thread.sleep(500);
- }
- catch (InterruptedException e)
- {
- // 再次调用interrupt
- Thread.currentThread().interrupt();
- }
- try
- {
- // 需要进行的操作 2
- Thread.sleep(500);
- }
- catch (InterruptedException e)
- {
- // 再次调用interrupt
- Thread.currentThread().interrupt();
- }
- }
- // 清理操作
- }
PART.6 处理ReentrantLock和Condition
1. 通过lockInterruptibly方法中断
- 捕获lockInterruptibly方法可能跑出的InterruptedException,并结束线程。
- public void run()
- {
- try
- {
- while (true)
- {
- locker.lockInterruptibly();
- // 其他操作
- locker.unlock();
- }
- }
- catch (InterruptedException e)
- {
- }
- // 清理操作
- }
2. 通过不设置超时的tryLock方法中断
- tryLock方法将不阻塞。
- 通过捕获Thread.sleep的异常(或其他方法)中断线程。
- public void run()
- {
- try
- {
- while (true)
- {
- if (locker.tryLock())
- {
- // 其他操作
- }
- Thread.sleep(500);
- }
- }
- catch (InterruptedException e)
- {
- }
- }
3. 通过设置超时的tryLock方法中断
- 捕获tryLock方法在线程中断时抛出的InterrupedException并结束线程。
- public void run()
- {
- try
- {
- while (true)
- {
- if (locker.tryLock(1, TimeUnit.SECONDS))
- {
- // 其他操作
- }
- }
- }
- catch (InterruptedException e)
- {
- }
- // 清理操作
- }
condition.await();
- // 其他操作
- locker.unlock();
- }
- }
- catch (InterruptedException e)
- {
- }
- // 清理操作
- }
5. 可中断的Producer和Consumer示例
- produce方法在线程中断时将跑出InterruptedException。
- run方法捕获该异常,并中断线程。
- public void run()
- {
- try
- {
- while (true)
- {
- produce();
- Thread.sleep((int)(Math.random() * 3000));
- }
- }
- catch (InterruptedException e)
- {
- }
- System.out.println("producer: end.");
- }
- private void produce() throws InterruptedException
- {
- locker.lockInterruptibly();
- try
- {
- // Produce
- int x = (int)(Math.random() * 100);
- queue.offer(x);
- emptyCondition.signalAll();
- System.out.printf("producer: %d and signal all. queue = %d /n",
- x, queue.size());
- }
- finally
- {
- locker.unlock();
- }
- }
- Consumer线程被中断后不结束,直到队列内所有数据被输出。
- 不使用Thread.currentThread().isInterrupted()而定义isInt记录中断,可以避免中断导致ReentrantLock方法的tryLock不能获得锁而直接抛出异常。
- public void run()
- {
- // 线程是否被中断
- boolean isInt = false;
- // 线程中断后,将队列内的数据全部读出,再结束线程。
- while (!isInt || !queue.isEmpty())
- {
- try
- {
- consume();
- Thread.sleep((int)(Math.random() * 3000));
- }
- catch (InterruptedException e)
- {
- isInt = true;
- }
- }
- System.out.println("consumer: end.");
- }
- private void consume() throws InterruptedException
- {
- if (!locker.tryLock(5, TimeUnit.SECONDS))
- {
- // 没有获得锁,不进行任何操作。 避免死锁。
- return;
- }
- try
- {
- // Consume
- while (queue.isEmpty())
- {
- System.out.println("consumer: waiting for condition.");
- if (!emptyCondition.await(5, TimeUnit.SECONDS))
- {
- // 5秒没有等待到条件,不进行任何操作。
- // 避免中断后在此处等待。
- return;
- }
- }
- int x = queue.poll();
- System.out.printf("consumer: %d, queue=%d /n", x, queue.size());
- }
- finally
- {
- locker.unlock();
- }
- }
queue.put(x);
- System.out.println("producer: " + x);
- Thread.sleep((int)(Math.random() * 3000));
- }
- }
- catch (InterruptedException e)
- {
- }
- System.out.println("producer: end.");
- }
Consumer:
- public void run()
- {
- try
- {
- while (true)
- {
- int x = queue.take();
- System.out.println("consumer: " + x);
- Thread.sleep((int)(Math.random() * 3000));
- }
- }
- catch (InterruptedException e)
- {
- }
- System.out.println("consumer: end.");
- }
PART.8 处理Socket和ServerSocket
1. 处理ServerSocket的accept方法
- 调用ServerSocket的setSoTimeout方法设置超时时间。
- 捕获并处理超时引起的SocketTimeoutException。
- 不需要处理该异常。
- public void run()
- {
- try
- {
- // 设置超时时间
- serverSocket.setSoTimeout(2000);
- }
- catch (SocketException e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- while (!Thread.currentThread().isInterrupted())
- {
- try
- {
- @SuppressWarnings("unused")
- Socket s = serverSocket.accept();
- }
- catch (SocketTimeoutException e)
- {
- // 超时,不进行任何处理,再次调用accept方法
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- // 清理工作
- }
2. 处理包装自Socket的BufferedReader的readLine方法
- 调用Socket的setSoTimeout方法设置超时时间。
- BufferedReader的readLine方法在阻塞指定的时间后将抛出SocketTimeoutException。
- 不需要处理该异常。
- public void run()
- {
- BufferedReader reader = null;
- try
- {
- // 建立测试用的链接
- serverSocket = new ServerSocket(10009);
- socket = new Socket("127.0.0.1", 10009);
- Thread.sleep(500);
- Socket s = serverSocket.accept();
- // 向socket发送5行数据
- OutputStreamWriter w = new OutputStreamWriter(
- s.getOutputStream());
- w.write("line1 /n line2 /n line3 /n line4 /n line5 /n");
- w.flush();
- // 设置超时
- socket.setSoTimeout(1000);
- // 创建BufferedReader
- reader = new BufferedReader(
- new InputStreamReader(socket.getInputStream()));
- }
- catch (IOException e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- while (!Thread.currentThread().isInterrupted())
- {
- try
- {
- String s = reader.readLine();
- if (s == null)
- {
- // 流结束
- break;
- }
- // 输出读取的数据
- System.out.println("thread: " + s);
- }
- catch (SocketTimeoutException e)
- {
- System.out.println("thread: socket timeout.");
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- // 清理
- try { serverSocket.close(); } catch (Exception e) { }
- try { socket.close(); } catch (Exception e) { }
- System.out.println("thread: end.");
- }
3. 处理包装自Socket的ObjectInputStream的readObject方法
- 与readLine的处理方法类似。
- 调用Socket的setSoTimeout方法设置超时时间。
- ObjectInputStream的readObject方法在阻塞指定的时间后将抛出异常。
- 不需要处理该异常。
- public void run()
- {
- ObjectInputStream ois = null;
- try
- {
- // 建立测试用的链接
- serverSocket = new ServerSocket(10009);
- socket = new Socket("127.0.0.1", 10009);
- Thread.sleep(500);
- Socket s = serverSocket.accept();
- // 向socket发送3个对象
- ObjectOutputStream oos = new ObjectOutputStream(
- s.getOutputStream());
- oos.writeObject(new TestData("object 1"));
- oos.writeObject(new TestData("object 2"));
- oos.writeObject(new TestData("object 3"));
- // 设置超时
- socket.setSoTimeout(1000);
- // 创建ObjectInputStream
- ois = new ObjectInputStream(socket.getInputStream());
- }
- catch (IOException e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
- while (!Thread.currentThread().isInterrupted())
- {
- try
- {
- TestData s = (TestData)ois.readObject();
- if (s == null)
- {
- // 流结束
- break;
- }
- // 输出读取的数据
- System.out.println("thread: " + s.data);
- }
- catch (SocketTimeoutException e)
- {
- System.out.println("thread: socket timeout.");
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- catch (ClassNotFoundException e)
- {
- e.printStackTrace();
- }
- }
- // 清理
- try { serverSocket.close(); } catch (Exception e) { }
- try { socket.close(); } catch (Exception e) { }
- System.out.println("thread: end.");
- }
其中,TestData是一个简单的可序列化的类。
- private static class TestData implements Serializable
- {
- private static final long serialVersionUID = 6147773210845607198L;
- public String data;
- public TestData(String data)
- {
- this.data = data;
- }
- }
PART.9 总结
见“PART.2可中断的线程”和“PART.4 处理阻塞”。