学习目标

  • 理解什么是线程安全以及为什么它在多线程编程中至关重要
  • 掌握共享资源访问时产生竞态条件的原因和表现
  • 学习识别典型的线程安全问题和并发风险
  • 通过实例学习如何解决多线程计数器问题

1. 什么是线程安全

1.1 线程安全的定义

线程安全是指在多线程环境下,程序能够正确地处理共享资源,确保数据的一致性和正确性。一个线程安全的程序,无论有多少线程并发执行,都不会出现数据破坏或者得到错误结果的情况。

  1. package org.devlive.tutorial.multithreading.chapter04;
  2. /**
  3. * 线程安全与非线程安全的对比
  4. */
  5. public class ThreadSafetyDemo {
  6. public static void main(String[] args) throws InterruptedException {
  7. // 演示线程不安全的计数器
  8. UnsafeCounter unsafeCounter = new UnsafeCounter();
  9. runCounterTest("线程不安全的计数器", unsafeCounter);
  10. // 演示线程安全的计数器
  11. SafeCounter safeCounter = new SafeCounter();
  12. runCounterTest("线程安全的计数器", safeCounter);
  13. }
  14. // 线程不安全的计数器
  15. static class UnsafeCounter {
  16. private int count = 0;
  17. public void increment() {
  18. count++; // 非原子操作
  19. }
  20. public int getCount() {
  21. return count;
  22. }
  23. }
  24. // 线程安全的计数器
  25. static class SafeCounter {
  26. private int count = 0;
  27. public synchronized void increment() {
  28. count++; // 使用synchronized确保原子性
  29. }
  30. public synchronized int getCount() {
  31. return count;
  32. }
  33. }
  34. // 测试计数器
  35. private static void runCounterTest(String name, Object counter) throws InterruptedException {
  36. // 创建多个线程同时增加计数器
  37. Thread[] threads = new Thread[10];
  38. for (int i = 0; i < threads.length; i++) {
  39. threads[i] = new Thread(() -> {
  40. for (int j = 0; j < 10000; j++) {
  41. if (counter instanceof UnsafeCounter) {
  42. ((UnsafeCounter) counter).increment();
  43. } else if (counter instanceof SafeCounter) {
  44. ((SafeCounter) counter).increment();
  45. }
  46. }
  47. });
  48. threads[i].start();
  49. }
  50. // 等待所有线程完成
  51. for (Thread thread : threads) {
  52. thread.join();
  53. }
  54. // 检查最终结果
  55. int finalCount = 0;
  56. if (counter instanceof UnsafeCounter) {
  57. finalCount = ((UnsafeCounter) counter).getCount();
  58. } else if (counter instanceof SafeCounter) {
  59. finalCount = ((SafeCounter) counter).getCount();
  60. }
  61. System.out.println(name + ": 预期结果=100000, 实际结果=" + finalCount +
  62. (finalCount == 100000 ? " (正确)" : " (错误)"));
  63. }
  64. }

📌 提示: 在上面的例子中,线程不安全的计数器通常会得到小于预期的结果,因为多个线程同时执行count++操作时会发生数据竞争。

1.2 线程安全的重要性

线程安全对于多线程程序至关重要,原因包括:

  1. 数据一致性:确保程序的数据状态始终保持一致,不会出现数据损坏或不一致的情况。

  2. 正确性:确保程序的行为符合预期,产生正确的结果。

  3. 避免难以调试的问题:线程安全问题通常表现为间歇性故障,很难复现和调试。

  4. 提高可靠性:线程安全的程序在各种并发场景下都能稳定运行,提高整体可靠性。

2. 共享资源访问的竞态条件

2.1 什么是竞态条件

竞态条件(Race Condition)是指当多个线程同时访问共享资源,并且尝试同时修改该资源时,最终的结果依赖于线程执行的顺序和时机,导致结果不可预测。

在多线程环境下,竞态条件主要出现在以下场景:

  1. 读-改-写 操作序列不是原子的,可能被其他线程中断
  2. 检查再执行 模式,在检查和执行之间状态可能发生变化
  3. 多步骤操作 不是以原子方式执行的

2.2 竞态条件示例

  1. package org.devlive.tutorial.multithreading.chapter04;
  2. /**
  3. * 竞态条件示例
  4. */
  5. public class RaceConditionDemo {
  6. // 银行账户类(线程不安全)
  7. static class BankAccount {
  8. private int balance; // 余额
  9. public BankAccount(int initialBalance) {
  10. this.balance = initialBalance;
  11. }
  12. // 存款方法
  13. public void deposit(int amount) {
  14. // 读取余额
  15. int current = balance;
  16. // 模拟处理延迟,使竞态条件更容易出现
  17. try {
  18. Thread.sleep(1);
  19. } catch (InterruptedException e) {
  20. Thread.currentThread().interrupt();
  21. }
  22. // 计算新余额并更新
  23. balance = current + amount;
  24. }
  25. // 取款方法
  26. public void withdraw(int amount) {
  27. // 读取余额
  28. int current = balance;
  29. // 模拟处理延迟
  30. try {
  31. Thread.sleep(1);
  32. } catch (InterruptedException e) {
  33. Thread.currentThread().interrupt();
  34. }
  35. // 只有余额充足才能取款
  36. if (current >= amount) {
  37. // 计算新余额并更新
  38. balance = current - amount;
  39. }
  40. }
  41. public int getBalance() {
  42. return balance;
  43. }
  44. }
  45. public static void main(String[] args) throws InterruptedException {
  46. // 创建账户,初始余额为1000
  47. BankAccount account = new BankAccount(1000);
  48. // 创建多个存款线程,每个存款100
  49. Thread[] depositThreads = new Thread[5];
  50. for (int i = 0; i < depositThreads.length; i++) {
  51. depositThreads[i] = new Thread(() -> {
  52. for (int j = 0; j < 10; j++) {
  53. account.deposit(100);
  54. }
  55. });
  56. }
  57. // 创建多个取款线程,每个取款100
  58. Thread[] withdrawThreads = new Thread[5];
  59. for (int i = 0; i < withdrawThreads.length; i++) {
  60. withdrawThreads[i] = new Thread(() -> {
  61. for (int j = 0; j < 10; j++) {
  62. account.withdraw(100);
  63. }
  64. });
  65. }
  66. // 启动所有线程
  67. System.out.println("启动线程,模拟账户并发操作...");
  68. System.out.println("初始余额: " + account.getBalance());
  69. for (Thread t : depositThreads) t.start();
  70. for (Thread t : withdrawThreads) t.start();
  71. // 等待所有线程完成
  72. for (Thread t : depositThreads) t.join();
  73. for (Thread t : withdrawThreads) t.join();
  74. // 检查最终余额
  75. System.out.println("所有操作完成后余额: " + account.getBalance());
  76. System.out.println("预期余额: 1000 (初始值) + 5*10*100 (存款) - 5*10*100 (取款) = 1000");
  77. }
  78. }

在上面的示例中,银行账户的存取款操作不是原子的,由于竞态条件,最终的余额可能不等于预期的1000。

⚠️ 警告: 竞态条件是多线程编程中最常见也是最危险的问题之一,因为它可能导致不可重现的错误和数据损坏。

2.3 竞态条件的类型

读-改-写竞态条件

  1. // 线程A和线程B同时执行这段代码:
  2. int temp = counter; // 读
  3. temp = temp + 1; // 改
  4. counter = temp; // 写

如果counter初始值为0,预期两个线程执行后counter应该是2,但可能会出现以下情况:

  1. 线程A读取counter为0
  2. 线程B读取counter为0
  3. 线程A计算temp + 1 = 1并更新counter为1
  4. 线程B计算temp + 1 = 1并更新counter为1

最终结果是1,而不是预期的2。

检查再执行竞态条件

  1. // 方法示例:转账
  2. public void transfer(Account to, double amount) {
  3. if (this.balance >= amount) { // 检查
  4. this.balance -= amount; // 执行
  5. to.balance += amount; // 执行
  6. }
  7. }

如果两个线程同时从同一账户转账,且账户余额仅足够一次转账,可能会出现以下情况:

  1. 线程A检查余额足够并准备转账
  2. 线程B检查余额足够并准备转账
  3. 线程A执行转账,减少余额
  4. 线程B执行转账,再次减少余额,可能导致余额为负

3. 线程安全问题的表现形式

3.1 原子性问题

当一个操作本应该是不可分割的,但在多线程环境下被拆分执行时,就会出现原子性问题。Java中许多看起来是一条语句的操作实际上并不是原子的。

  1. package org.devlive.tutorial.multithreading.chapter04;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. /**
  4. * 原子性问题示例
  5. */
  6. public class AtomicityProblemDemo {
  7. public static void main(String[] args) throws InterruptedException {
  8. // 测试非原子操作
  9. testCounter("非原子计数器", new NonAtomicCounter());
  10. // 测试使用原子变量的计数器
  11. testCounter("原子计数器", new AtomicCounter());
  12. }
  13. // 非原子计数器
  14. static class NonAtomicCounter {
  15. private int count = 0;
  16. public void increment() {
  17. count++; // 看似简单,实际上是读-改-写三步操作
  18. }
  19. public int getCount() {
  20. return count;
  21. }
  22. }
  23. // 使用原子变量的计数器
  24. static class AtomicCounter {
  25. private AtomicInteger count = new AtomicInteger(0);
  26. public void increment() {
  27. count.incrementAndGet(); // 原子操作
  28. }
  29. public int getCount() {
  30. return count.get();
  31. }
  32. }
  33. // 测试计数器
  34. private static void testCounter(String name, Object counter) throws InterruptedException {
  35. final int NUM_THREADS = 10;
  36. final int ITERATIONS = 100000;
  37. // 创建并启动多个线程递增计数器
  38. Thread[] threads = new Thread[NUM_THREADS];
  39. for (int i = 0; i < NUM_THREADS; i++) {
  40. threads[i] = new Thread(() -> {
  41. for (int j = 0; j < ITERATIONS; j++) {
  42. if (counter instanceof NonAtomicCounter) {
  43. ((NonAtomicCounter) counter).increment();
  44. } else if (counter instanceof AtomicCounter) {
  45. ((AtomicCounter) counter).increment();
  46. }
  47. }
  48. });
  49. threads[i].start();
  50. }
  51. // 等待所有线程完成
  52. for (Thread thread : threads) {
  53. thread.join();
  54. }
  55. // 检查结果
  56. int finalCount = 0;
  57. if (counter instanceof NonAtomicCounter) {
  58. finalCount = ((NonAtomicCounter) counter).getCount();
  59. } else if (counter instanceof AtomicCounter) {
  60. finalCount = ((AtomicCounter) counter).getCount();
  61. }
  62. int expectedCount = NUM_THREADS * ITERATIONS;
  63. System.out.println(name + " - 预期结果: " + expectedCount + ", 实际结果: " + finalCount +
  64. (finalCount == expectedCount ? " (正确)" : " (错误)"));
  65. }
  66. }

📌 提示: 在Java中,以下操作都不是原子的:

  • 递增/递减操作(如i++, i--
  • 复合赋值操作(如i += 5
  • 在volatile变量上执行的非赋值操作

3.2 可见性问题

可见性问题指的是一个线程对共享变量的修改,另一个线程可能看不到这个修改的最新值。这通常是由于CPU缓存、编译器优化或指令重排序导致的。

  1. package org.devlive.tutorial.multithreading.chapter04;
  2. /**
  3. * 可见性问题示例
  4. */
  5. public class VisibilityProblemDemo {
  6. // 没有volatile修饰的标志变量
  7. private static boolean stopRequested = false;
  8. // 使用volatile修饰的标志变量
  9. private static volatile boolean volatileStopRequested = false;
  10. public static void main(String[] args) throws InterruptedException {
  11. // 测试没有volatile的情况
  12. testVisibility(false);
  13. // 测试使用volatile的情况
  14. testVisibility(true);
  15. }
  16. private static void testVisibility(boolean useVolatile) throws InterruptedException {
  17. System.out.println("\n======== " + (useVolatile ? "使用volatile" : "不使用volatile") + " ========");
  18. // 重置标志
  19. stopRequested = false;
  20. volatileStopRequested = false;
  21. // 创建工作线程,不断检查标志变量
  22. Thread workerThread = new Thread(() -> {
  23. long i = 0;
  24. System.out.println("工作线程开始执行...");
  25. // 根据参数选择使用哪个标志变量
  26. if (useVolatile) {
  27. while (!volatileStopRequested) {
  28. i++;
  29. }
  30. } else {
  31. while (!stopRequested) {
  32. i++;
  33. }
  34. }
  35. System.out.println("工作线程检测到停止信号,循环次数:" + i);
  36. });
  37. workerThread.start();
  38. // 主线程等待一会儿
  39. Thread.sleep(1000);
  40. System.out.println("主线程设置停止信号...");
  41. // 设置停止标志
  42. if (useVolatile) {
  43. volatileStopRequested = true;
  44. } else {
  45. stopRequested = true;
  46. }
  47. // 等待工作线程结束
  48. workerThread.join(5000);
  49. // 检查线程是否还活着
  50. if (workerThread.isAlive()) {
  51. System.out.println("工作线程仍在运行!可能存在可见性问题");
  52. workerThread.interrupt(); // 强制中断
  53. } else {
  54. System.out.println("工作线程正确终止");
  55. }
  56. }
  57. }

在不使用volatile的情况下,由于可见性问题,工作线程可能无法看到主线程对stopRequested的修改,导致它无限循环下去。

📌 提示: Java提供了volatile关键字来解决可见性问题。被volatile修饰的变量,对它的读写都会直接在主内存中进行,而不是在CPU缓存中。

3.3 有序性问题

有序性问题是指程序的执行顺序可能与代码编写的顺序不同。这是由于编译器优化、CPU指令重排序等导致的。在单线程环境下,重排序后的执行结果与顺序执行的结果相同,但在多线程环境下可能导致问题。

  1. package org.devlive.tutorial.multithreading.chapter04;
  2. /**
  3. * 有序性问题示例
  4. */
  5. public class OrderingProblemDemo {
  6. private static int x = 0, y = 0;
  7. private static int a = 0, b = 0;
  8. public static void main(String[] args) throws InterruptedException {
  9. int iterations = 0;
  10. int abnormalResults = 0;
  11. // 多次运行测试,统计出现重排序的次数
  12. final int TEST_COUNT = 100000;
  13. System.out.println("开始测试重排序问题,运行 " + TEST_COUNT + " 次...");
  14. for (int i = 0; i < TEST_COUNT; i++) {
  15. iterations++;
  16. // 重置变量
  17. x = 0; y = 0;
  18. a = 0; b = 0;
  19. // 创建线程1
  20. Thread thread1 = new Thread(() -> {
  21. a = 1; // 语句1
  22. x = b; // 语句2
  23. });
  24. // 创建线程2
  25. Thread thread2 = new Thread(() -> {
  26. b = 1; // 语句3
  27. y = a; // 语句4
  28. });
  29. // 启动线程
  30. thread1.start();
  31. thread2.start();
  32. // 等待线程结束
  33. thread1.join();
  34. thread2.join();
  35. // 检查结果
  36. if (x == 0 && y == 0) {
  37. abnormalResults++;
  38. // 因为出现频率较低,只在前几次出现时打印详细信息
  39. if (abnormalResults <= 10) {
  40. System.out.println("检测到可能的指令重排序: x=" + x + ", y=" + y);
  41. }
  42. }
  43. }
  44. // 统计结果
  45. System.out.println("\n测试完成:");
  46. System.out.println("总测试次数: " + iterations);
  47. System.out.println("检测到的异常结果次数(x=0, y=0): " + abnormalResults);
  48. System.out.println("异常结果比例: " + String.format("%.5f%%", (double)abnormalResults / iterations * 100));
  49. System.out.println("\n分析:");
  50. if (abnormalResults > 0) {
  51. System.out.println("检测到可能的指令重排序。在某些情况下,两个线程中的操作可能被重排序,导致x=0且y=0。");
  52. System.out.println("这表明Java内存模型允许某些指令重排序,可能影响多线程程序的执行结果。");
  53. } else {
  54. System.out.println("未检测到明显的指令重排序。这可能是由于:");
  55. System.out.println("1. 当前硬件和JVM实现中不太容易观察到这种重排序");
  56. System.out.println("2. 测试次数不够多");
  57. System.out.println("但这并不意味着指令重排序不存在。在复杂的多线程程序中,它仍然可能导致问题。");
  58. }
  59. }
  60. }

在上面的例子中,如果没有指令重排序,理论上xy不可能同时为0。但由于指令重排序的存在,它们确实可能同时为0。

📌 提示: Java内存模型通过happens-before关系保证有序性。此外,volatilesynchronizedfinal关键字也能在一定程度上阻止有害的指令重排序。

4. 实战案例:商品库存管理

下面我们通过一个商品库存管理的实战案例,来综合应用所学的线程安全知识:

  1. package org.devlive.tutorial.multithreading.chapter04;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. import java.util.concurrent.locks.ReadWriteLock;
  7. import java.util.concurrent.locks.ReentrantReadWriteLock;
  8. /**
  9. * 商品库存管理实战案例
  10. */
  11. public class InventoryManagementDemo {
  12. // 线程不安全的库存管理
  13. static class UnsafeInventory {
  14. private final Map<String, Integer> productStock = new HashMap<>();
  15. // 添加或更新库存
  16. public void updateStock(String productId, int quantity) {
  17. Integer currentQuantity = productStock.get(productId);
  18. if (currentQuantity == null) {
  19. productStock.put(productId, quantity);
  20. } else {
  21. productStock.put(productId, currentQuantity + quantity);
  22. }
  23. }
  24. // 减少库存(如果库存不足,返回false)
  25. public boolean decreaseStock(String productId, int quantity) {
  26. Integer currentQuantity = productStock.get(productId);
  27. if (currentQuantity == null || currentQuantity < quantity) {
  28. return false;
  29. }
  30. productStock.put(productId, currentQuantity - quantity);
  31. return true;
  32. }
  33. // 获取当前库存
  34. public Integer getStock(String productId) {
  35. return productStock.getOrDefault(productId, 0);
  36. }
  37. // 获取所有商品库存
  38. public Map<String, Integer> getAllStock() {
  39. return new HashMap<>(productStock);
  40. }
  41. }
  42. // 使用synchronized的线程安全库存管理
  43. static class SynchronizedInventory {
  44. private final Map<String, Integer> productStock = new HashMap<>();
  45. // 添加或更新库存
  46. public synchronized void updateStock(String productId, int quantity) {
  47. Integer currentQuantity = productStock.get(productId);
  48. if (currentQuantity == null) {
  49. productStock.put(productId, quantity);
  50. } else {
  51. productStock.put(productId, currentQuantity + quantity);
  52. }
  53. }
  54. // 减少库存(如果库存不足,返回false)
  55. public synchronized boolean decreaseStock(String productId, int quantity) {
  56. Integer currentQuantity = productStock.get(productId);
  57. if (currentQuantity == null || currentQuantity < quantity) {
  58. return false;
  59. }
  60. productStock.put(productId, currentQuantity - quantity);
  61. return true;
  62. }
  63. // 获取当前库存
  64. public synchronized Integer getStock(String productId) {
  65. return productStock.getOrDefault(productId, 0);
  66. }
  67. // 获取所有商品库存
  68. public synchronized Map<String, Integer> getAllStock() {
  69. return new HashMap<>(productStock);
  70. }
  71. }
  72. // 使用ConcurrentHashMap和AtomicInteger的线程安全库存管理
  73. static class ConcurrentInventory {
  74. private final Map<String, AtomicInteger> productStock = new ConcurrentHashMap<>();
  75. // 添加或更新库存
  76. public void updateStock(String productId, int quantity) {
  77. productStock.computeIfAbsent(productId, k -> new AtomicInteger(0))
  78. .addAndGet(quantity);
  79. }
  80. // 减少库存(如果库存不足,返回false)
  81. public boolean decreaseStock(String productId, int quantity) {
  82. AtomicInteger stock = productStock.get(productId);
  83. if (stock == null) {
  84. return false;
  85. }
  86. int currentValue;
  87. do {
  88. currentValue = stock.get();
  89. if (currentValue < quantity) {
  90. return false;
  91. }
  92. } while (!stock.compareAndSet(currentValue, currentValue - quantity));
  93. return true;
  94. }
  95. // 获取当前库存
  96. public Integer getStock(String productId) {
  97. AtomicInteger stock = productStock.get(productId);
  98. return stock != null ? stock.get() : 0;
  99. }
  100. // 获取所有商品库存
  101. public Map<String, Integer> getAllStock() {
  102. Map<String, Integer> result = new HashMap<>();
  103. productStock.forEach((key, value) -> result.put(key, value.get()));
  104. return result;
  105. }
  106. }
  107. // 使用读写锁的线程安全库存管理
  108. static class ReadWriteLockInventory {
  109. private final Map<String, Integer> productStock = new HashMap<>();
  110. private final ReadWriteLock lock = new ReentrantReadWriteLock();
  111. // 添加或更新库存(写操作)
  112. public void updateStock(String productId, int quantity) {
  113. lock.writeLock().lock();
  114. try {
  115. Integer currentQuantity = productStock.get(productId);
  116. if (currentQuantity == null) {
  117. productStock.put(productId, quantity);
  118. } else {
  119. productStock.put(productId, currentQuantity + quantity);
  120. }
  121. } finally {
  122. lock.writeLock().unlock();
  123. }
  124. }
  125. // 减少库存(写操作)
  126. public boolean decreaseStock(String productId, int quantity) {
  127. lock.writeLock().lock();
  128. try {
  129. Integer currentQuantity = productStock.get(productId);
  130. if (currentQuantity == null || currentQuantity < quantity) {
  131. return false;
  132. }
  133. productStock.put(productId, currentQuantity - quantity);
  134. return true;
  135. } finally {
  136. lock.writeLock().unlock();
  137. }
  138. }
  139. // 获取当前库存(读操作)
  140. public Integer getStock(String productId) {
  141. lock.readLock().lock();
  142. try {
  143. return productStock.getOrDefault(productId, 0);
  144. } finally {
  145. lock.readLock().unlock();
  146. }
  147. }
  148. // 获取所有商品库存(读操作)
  149. public Map<String, Integer> getAllStock() {
  150. lock.readLock().lock();
  151. try {
  152. return new HashMap<>(productStock);
  153. } finally {
  154. lock.readLock().unlock();
  155. }
  156. }
  157. }
  158. public static void main(String[] args) throws InterruptedException {
  159. // 创建不同类型的库存管理器
  160. UnsafeInventory unsafeInventory = new UnsafeInventory();
  161. SynchronizedInventory syncInventory = new SynchronizedInventory();
  162. ConcurrentInventory concurrentInventory = new ConcurrentInventory();
  163. ReadWriteLockInventory rwlInventory = new ReadWriteLockInventory();
  164. // 初始化库存
  165. String[] products = {"iPhone", "MacBook", "iPad", "AirPods"};
  166. for (String product : products) {
  167. unsafeInventory.updateStock(product, 1000);
  168. syncInventory.updateStock(product, 1000);
  169. concurrentInventory.updateStock(product, 1000);
  170. rwlInventory.updateStock(product, 1000);
  171. }
  172. // 测试线程不安全的库存管理
  173. System.out.println("测试线程不安全的库存管理");
  174. testInventory(unsafeInventory, products);
  175. // 测试synchronized的库存管理
  176. System.out.println("\n测试synchronized的库存管理");
  177. testInventory(syncInventory, products);
  178. // 测试ConcurrentHashMap的库存管理
  179. System.out.println("\n测试ConcurrentHashMap的库存管理");
  180. testInventory(concurrentInventory, products);
  181. // 测试ReadWriteLock的库存管理
  182. System.out.println("\n测试ReadWriteLock的库存管理");
  183. testInventory(rwlInventory, products);
  184. }
  185. private static void testInventory(Object inventory, String[] products) throws InterruptedException {
  186. // 创建多个购买线程(减少库存)
  187. Thread[] buyThreads = new Thread[10];
  188. for (int i = 0; i < buyThreads.length; i++) {
  189. buyThreads[i] = new Thread(() -> {
  190. for (int j = 0; j < 100; j++) {
  191. String product = products[j % products.length];
  192. boolean success = false;
  193. if (inventory instanceof UnsafeInventory) {
  194. success = ((UnsafeInventory) inventory).decreaseStock(product, 1);
  195. } else if (inventory instanceof SynchronizedInventory) {
  196. success = ((SynchronizedInventory) inventory).decreaseStock(product, 1);
  197. } else if (inventory instanceof ConcurrentInventory) {
  198. success = ((ConcurrentInventory) inventory).decreaseStock(product, 1);
  199. } else if (inventory instanceof ReadWriteLockInventory) {
  200. success = ((ReadWriteLockInventory) inventory).decreaseStock(product, 1);
  201. }
  202. if (!success) {
  203. System.out.println("购买失败: " + product + " - 库存不足");
  204. }
  205. }
  206. });
  207. }
  208. // 创建多个补货线程(增加库存)
  209. Thread[] restockThreads = new Thread[5];
  210. for (int i = 0; i < restockThreads.length; i++) {
  211. restockThreads[i] = new Thread(() -> {
  212. for (int j = 0; j < 40; j++) {
  213. String product = products[j % products.length];
  214. if (inventory instanceof UnsafeInventory) {
  215. ((UnsafeInventory) inventory).updateStock(product, 5);
  216. } else if (inventory instanceof SynchronizedInventory) {
  217. ((SynchronizedInventory) inventory).updateStock(product, 5);
  218. } else if (inventory instanceof ConcurrentInventory) {
  219. ((ConcurrentInventory) inventory).updateStock(product, 5);
  220. } else if (inventory instanceof ReadWriteLockInventory) {
  221. ((ReadWriteLockInventory) inventory).updateStock(product, 5);
  222. }
  223. }
  224. });
  225. }
  226. // 创建多个查询线程(读取库存)
  227. Thread[] queryThreads = new Thread[20];
  228. for (int i = 0; i < queryThreads.length; i++) {
  229. queryThreads[i] = new Thread(() -> {
  230. for (int j = 0; j < 50; j++) {
  231. String product = products[j % products.length];
  232. int stock = 0;
  233. if (inventory instanceof UnsafeInventory) {
  234. stock = ((UnsafeInventory) inventory).getStock(product);
  235. } else if (inventory instanceof SynchronizedInventory) {
  236. stock = ((SynchronizedInventory) inventory).getStock(product);
  237. } else if (inventory instanceof ConcurrentInventory) {
  238. stock = ((ConcurrentInventory) inventory).getStock(product);
  239. } else if (inventory instanceof ReadWriteLockInventory) {
  240. stock = ((ReadWriteLockInventory) inventory).getStock(product);
  241. }
  242. // 不打印库存信息,避免输出过多
  243. }
  244. });
  245. }
  246. // 记录开始时间
  247. long startTime = System.currentTimeMillis();
  248. // 启动所有线程
  249. for (Thread t : buyThreads) t.start();
  250. for (Thread t : restockThreads) t.start();
  251. for (Thread t : queryThreads) t.start();
  252. // 等待所有线程完成
  253. for (Thread t : buyThreads) t.join();
  254. for (Thread t : restockThreads) t.join();
  255. for (Thread t : queryThreads) t.join();
  256. // 计算耗时
  257. long endTime = System.currentTimeMillis();
  258. // 输出最终库存和执行时间
  259. System.out.println("执行时间: " + (endTime - startTime) + "ms");
  260. System.out.println("最终库存:");
  261. Map<String, Integer> finalStock = null;
  262. if (inventory instanceof UnsafeInventory) {
  263. finalStock = ((UnsafeInventory) inventory).getAllStock();
  264. } else if (inventory instanceof SynchronizedInventory) {
  265. finalStock = ((SynchronizedInventory) inventory).getAllStock();
  266. } else if (inventory instanceof ConcurrentInventory) {
  267. finalStock = ((ConcurrentInventory) inventory).getAllStock();
  268. } else if (inventory instanceof ReadWriteLockInventory) {
  269. finalStock = ((ReadWriteLockInventory) inventory).getAllStock();
  270. }
  271. if (finalStock != null) {
  272. for (String product : products) {
  273. System.out.println(product + ": " + finalStock.get(product));
  274. }
  275. }
  276. // 验证库存一致性
  277. int expectedBaseline = 1000; // 初始库存
  278. int buyOps = buyThreads.length * 100; // 总购买操作
  279. int restockOps = restockThreads.length * 40 * 5; // 总补货操作
  280. // 平均到每种商品
  281. int opsPerProduct = (buyOps - restockOps) / products.length;
  282. int expectedStock = expectedBaseline - opsPerProduct;
  283. System.out.println("\n库存检查:");
  284. System.out.println("每种商品预期变化: " + opsPerProduct + " (负数表示库存减少)");
  285. System.out.println("预期最终库存: " + expectedStock);
  286. }
  287. }

在这个案例中,我们实现了四种不同的库存管理方式,并比较了它们在多线程环境下的表现:

  1. 不安全的库存管理(使用HashMap)
  2. 使用synchronized的库存管理
  3. 使用ConcurrentHashMap和AtomicInteger的库存管理
  4. 使用读写锁的库存管理

测试结果可能显示线程不安全的实现存在数据不一致的问题,而其他三种线程安全的实现能够保证数据的正确性,但性能各有差异。

常见问题与解决方案

问题1:多个操作需要作为一个原子单元执行

问题描述: 有时我们需要确保多个操作作为一个不可分割的单元执行,以保持数据的一致性。

解决方案: 使用锁(synchronized或显式锁)来确保互斥访问。

  1. // 使用synchronized
  2. public synchronized boolean transferMoney(Account from, Account to, double amount) {
  3. if (from.getBalance() < amount) {
  4. return false;
  5. }
  6. from.debit(amount);
  7. to.credit(amount);
  8. return true;
  9. }
  10. // 使用显式锁
  11. public boolean transferMoney(Account from, Account to, double amount) {
  12. lock.lock();
  13. try {
  14. if (from.getBalance() < amount) {
  15. return false;
  16. }
  17. from.debit(amount);
  18. to.credit(amount);
  19. return true;
  20. } finally {
  21. lock.unlock();
  22. }
  23. }

问题2:频繁读取但很少修改的数据

问题描述: 有些数据结构被频繁读取但很少修改,使用互斥锁会导致读操作也被阻塞,影响性能。

解决方案: 使用读写锁(ReadWriteLock)允许多个读操作并发执行,而写操作依然互斥。

  1. private final Map<String, Product> productCatalog = new HashMap<>();
  2. private final ReadWriteLock lock = new ReentrantReadWriteLock();
  3. // 读操作(多个线程可以同时读取)
  4. public Product getProduct(String productId) {
  5. lock.readLock().lock();
  6. try {
  7. return productCatalog.get(productId);
  8. } finally {
  9. lock.readLock().unlock();
  10. }
  11. }
  12. // 写操作(互斥访问)
  13. public void updateProduct(Product product) {
  14. lock.writeLock().lock();
  15. try {
  16. productCatalog.put(product.getId(), product);
  17. } finally {
  18. lock.writeLock().unlock();
  19. }
  20. }

问题3:在检查和执行之间保持一致性

问题描述: 在执行某个操作前需要先检查条件,但在多线程环境下,检查和执行之间的状态可能已经变化。

解决方案: 使用锁确保检查和执行的原子性,或者使用原子变量的CAS操作。

  1. // 使用锁确保原子性
  2. public synchronized boolean decrementIfPositive(int[] array, int index) {
  3. if (array[index] > 0) {
  4. array[index]--;
  5. return true;
  6. }
  7. return false;
  8. }
  9. // 使用AtomicIntegerArray的CAS操作
  10. private final AtomicIntegerArray atomicArray = new AtomicIntegerArray(size);
  11. public boolean decrementIfPositive(int index) {
  12. while (true) {
  13. int current = atomicArray.get(index);
  14. if (current <= 0) {
  15. return false;
  16. }
  17. if (atomicArray.compareAndSet(index, current, current - 1)) {
  18. return true;
  19. }
  20. // 如果CAS失败,循环重试
  21. }
  22. }

小结

在本章中,我们深入探讨了线程安全问题,学习了以下关键内容:

  1. 线程安全的概念:线程安全指在多线程环境下,程序能够正确处理共享资源,确保数据的一致性和正确性。

  2. 竞态条件:当多个线程同时访问共享资源并尝试修改时,结果依赖于线程执行的顺序和时机,可能导致不可预测的结果。

  3. 线程安全问题的三种表现

    • 原子性问题:操作被拆分执行,导致数据不一致
    • 可见性问题:一个线程的修改对其他线程不可见
    • 有序性问题:指令重排序导致的执行顺序问题
  4. 解决方案

    • 使用synchronized关键字确保互斥访问
    • 使用原子变量(如AtomicInteger)进行原子操作
    • 使用显式锁(如ReentrantLock)提供更灵活的锁机制
    • 使用读写锁(ReadWriteLock)优化读多写少的场景
    • 使用线程安全的集合类(如ConcurrentHashMap
  5. 实战应用:通过商品库存管理案例,我们实践了不同的线程安全实现方式,并比较了它们的性能和正确性。

理解和解决线程安全问题是多线程编程的核心挑战。通过合理使用Java提供的同步工具,我们可以开发出高效、可靠的多线程应用程序。

在下一章,我们将详细探讨synchronized关键字的使用和底层实现原理。

本章节源代码地址为 https://github.com/qianmoQ/tutorial/tree/main/java-multithreading-tutorial/src/main/java/org/devlive/tutorial/multithreading/chapter04