目录

Operating System Chapter8 并发 bug 和应对


Operating System

$Nanjing\ University\rightarrow Yanyan\ Jiang\newline$

Overview

复习

  • 并发编程的基本工具:线程库、互斥和同步
  • 并发编程的应用场景:高性能计算、数据中心、网页/移动应用

本次课回答的问题

  • Q: 并发编程那么难,我写出 bug 怎么办?

本次课主要内容

  • 应对 bug (和并发 bug) 的方法
  • 死锁和数据竞争

应对 Bug 的方法

基本思路:否定你自己

虽然不太愿意承认,但始终假设自己的代码是错的。

然后呢?

  • 做好测试
  • 检查哪里错了
  • 再检查哪里错了
  • 再再检查哪里错了
    • (把任何你认为 “不对” 的情况都检查一遍)

Bug 多的根本原因:编程语言的缺陷

软件是需求 (规约) 在计算机数字世界的投影(关于变量的更多内容丢失了,只映射了一部分)

只管 “翻译” 代码,不管和实际需求 (规约) 是否匹配

  • 可以加入assert来进行证明

  • alipay.c的例子
    • 变量 balance 代表 “余额”
    • 怎么看 withdraw 以后 0 → 18446744073709551516 都不对
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#include "thread.h"

unsigned long balance = 100;

void Alipay_withdraw(int amt) {
  if (balance >= amt) {
    usleep(1); // unexpected delays
    balance -= amt;
  }
}

void Talipay(int id) {
  Alipay_withdraw(100);
}

int main() {
  create(Talipay);
  create(Talipay);
  join();
  printf("balance = %lu\n", balance);
}

三十年后的编程语言和编程方法?

更实在的方法:防御性编程

把程序需要满足的条件用 assert 表达出来 $\Longrightarrow$ 和上面的防御性编程相对应

例子:peterson-barrier.c、二叉树的旋转

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "thread.h"

#define A 1
#define B 2

#define BARRIER __sync_synchronize()

atomic_int nested;
atomic_long count;

void critical_section() {
  long cnt = atomic_fetch_add(&count, 1);
  int i = atomic_fetch_add(&nested, 1) + 1;
  if (i != 1) {
    printf("%d threads in the critical section @ count=%ld\n", i, cnt);
    assert(0);
  }
  atomic_fetch_add(&nested, -1);
}

int volatile x = 0, y = 0, turn;

void TA() {
  while (1) {
    x = 1;                   BARRIER;
    turn = B;                BARRIER; // <- this is critcal for x86
    while (1) {
      if (!y) break;         BARRIER;
      if (turn != B) break;  BARRIER;
    }
    critical_section();
    x = 0;                   BARRIER;
  }
}

void TB() {
  while (1) {
    y = 1;                   BARRIER;
    turn = A;                BARRIER;
    while (1) {
      if (!x) break;         BARRIER;
      if (turn != A) break;  BARRIER;
    }
    critical_section();
    y = 0;                   BARRIER;
  }
}

int main() {
  create(TA);
  create(TB);
}
/img/Operating System/chapter8-1.jpg
二叉树的旋转,维护的一些方法assert(y->parent == x'->parent && y->left == x && y->right == c);
/img/Operating System/chapter8-2.png
assert就是最有效的防御手段,相当于assert(i == 1);
  • 把assert写出来,你也就知道怎么写代码了

  • !!!面试的时候能写出干净利落的assert代码是一个重要的加分项,对增强代码的逻辑性和可读性以及debug都有很大帮助 $\Longrightarrow$ 没有人能够写出完美的代码,代码部分和assert是互相印证的,使得代码的可靠性上升了一个数量级

/img/Operating System/chapter8-3.gif
即使是Linus Torvalds声称自己的代码没有bug最后也被打脸
  • assert不一定需要绝对的正确,比如在上面的支付宝,如果测试过程中金额较小,我们完全可以使用assert(balance <= 100000);这样的代码
  • 还可以有这样的assert
1
2
3
4
5
6
7
void *ptr;
int pid;

//...

assert(ptr in heap);
assert(pid >= 0 && pid <= 1000);
  • 这些assert看起来和我们的现实世界的逻辑并没有关系,但是它防止了$Memory\ Error\newline$

防御性编程和规约给我们的启发

  • 你知道很多变量的含义
1
2
3
4
#define CHECK_INT(x, cond) \
  ({ panic_on(!((x) cond), "int check fail: " #x " " #cond); })
#define CHECK_HEAP(ptr) \
  ({ panic_on(!IN_RANGE((ptr), heap)); })
  • 变量有 “typed annotation”

    • CHECK_INT(waitlist->count, >= 0);

    • CHECK_INT(pid, < MAX_PROCS);

    • CHECK_HEAP(ctx->rip); CHECK_HEAP(ctx->cr3);

  • 变量含义改变 → 发生奇怪问题 (overflow, memory error, …)

    • 不要小看这些检查,它们在底层编程 (M2, L1, …) 时非常常见。尤其是C语言“裸奔”写OS内核,既没有语言层面的保护,也没有OS内核的保护(因为你写的就是这玩意),所以这些检查尤其重要!
    • 这些检查逐渐就从C语言演变成了集成度和安全性更高的一些C++标准库还有Rust的编译器(检查很严格,每次编译都直接叫跌)
    • 在虚拟机神秘重启/卡住/…前发出警报

并发 Bug:死锁 (Deadlock)

死锁 (Deadlock)

A deadlock is a state in which each member of a group is waiting for another member, including itself, to take action.

  • 出现线程 “互相等待” 的情况
/img/Operating System/chapter8-4.jpg
互相在等待(你等我,我等你,我等我自己)

AA-Deadlock

  • 假设你的 spinlock 不小心发生了中断

    • 在不该打开中断的时候开了中断

    • 在不该切换的时候执行了 yield()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
void os_run() {
  spin_lock(&list_lock);
  spin_lock(&xxx);
  spin_unlock(&xxx); // ---------+
}                          //    |
                           //    |
void on_interrupt() {      //    |
  spin_lock(&list_lock);   // <--+
  spin_unlock(&list_lock);
}
  • lock关闭中断,而unlock打开中断。上面的程序上了两把锁,却在只开一把锁的情况下打开了中断。$\Longrightarrow$ 中断需要等第一把锁释放,第一个unlock执行需要等中断返回才能执行完毕

ABBA-Deadlock

1
2
3
4
5
6
7
8
void swap(int i, int j) {
  spin_lock(&lock[i]);
  spin_lock(&lock[j]);
  arr[i] = NULL;
  arr[j] = arr[i];
  spin_unlock(&lock[j]);
  spin_unlock(&lock[i]);
}

上锁的顺序很重要……

  • swap本身看起来没有问题

    • swap(1, 2); swap(2, 3), swap(3, 1) → 死锁 $\Longrightarrow$ 如果三个并发执行并同时🔒住了1,2,3,那就和哲学家问题里面所有人都拿起左手的叉子一样变成了死锁
    • philosopher.c
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include "thread.h"
#include "thread-sync.h"

#define N 3
sem_t locks[N];

void Tphilosopher(int id) {
  int lhs = (N + id - 1) % N;
  int rhs = id % N;
  while (1) {
    P(&locks[lhs]);
    printf("T%d Got %d\n", id, lhs + 1);
    P(&locks[rhs]);
    printf("T%d Got %d\n", id, rhs + 1);
    V(&locks[lhs]);
    V(&locks[rhs]);
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < N; i++) {
      SEM_INIT(&locks[i], 1);
  }
  for (int i = 0; i < N; i++) {
    create(Tphilosopher);
  }
}

避免死锁(死锁产生的四个必要条件)

  • 死锁产生的四个必要条件 (Edward G. Coffman, 1971):

    • 互斥:一个资源每次只能被一个进程使用(例如🔒)

    • 请求与保持:一个进程请求资阻塞时,不释放已获得的资源(比如失败版的哲学家吃饭,右手拿叉子被阻塞的时候不会把左手拿到的叉子释放掉)

    • 不剥夺:进程已获得的资源不能强行剥夺(🔒没有优先级,获得之后任何人都拿不走)

    • 循环等待:若干进程之间形成头尾相接的循环等待资源关系(参考AA-Deadlock里面的mermaid图像以及十字路口的图像)。例子:有一组等待进程 {P0, P1,..., Pn}P0 等待的资源被 P1 占有,P1 等待的资源被 P2 占有,……,Pn-1 等待的资源被 Pn 占有,Pn 等待的资源被 P0 占有 $\rightarrow$ 画圈了

  • 注意 ⚠️ :这四个条件是产生死锁的 必要条件 ,也就是说只要系统发生死锁,这些条件必然成立,而只要上述条件之一不满足,就不会发生死锁

$$ \begin{align} &Deadlock \rightarrow Four\ conditions\newline &Deadlock \nleftarrow Four\ conditions\newline \end{align} $$

“理解了死锁的原因,尤其是产生死锁的四个必要条件,就可以最大可能地避免、预防和解除死锁。所以,在系统设计、进程调度等方面注意如何不让这四个必要条件成立,如何确定资源的合理分配算法,避免进程永久占据系统资源。此外,也要防止进程在处于等待状态的情况下占用资源。因此,对资源的分配要给予合理的规划。” ——Bullshit $\Longrightarrow$ 经典教科书废话,这四个条件想要破除非常困难

写一个模拟产生死锁的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::{
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

fn main() {
    let x = Arc::new(Mutex::new(0));
    let y = Arc::new(Mutex::new(0));
    let x1 = Arc::clone(&x);
    let y1 = Arc::clone(&y);
    let x2 = Arc::clone(&x);
    let y2 = Arc::clone(&y);

    let handle1 = thread::spawn(move || {
        let x = x1.lock().unwrap();
        println!("handle1 has own x: {}", *x);
        thread::sleep(Duration::from_secs(100));
        let y = y1.lock().unwrap();
        println!("handle1 has own y: {}", *y);
    });

    let handle2 = thread::spawn(move || {
        let y = y2.lock().unwrap();
        println!("handle2 has own y: {}", *y);
        thread::sleep(Duration::from_secs(100));
        let x = x2.lock().unwrap();
        println!("handle2 has own x: {}", *x);
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class DeadLockDemo {
    private static Object resource1 = new Object();//资源 1
    private static Object resource2 = new Object();//资源 2

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (resource1) {
                System.out.println(Thread.currentThread() + "get resource1");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "waiting get resource2");
                synchronized (resource2) {
                    System.out.println(Thread.currentThread() + "get resource2");
                }
            }
        }, "线程 1").start();

        new Thread(() -> {
            synchronized (resource2) {
                System.out.println(Thread.currentThread() + "get resource2");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "waiting get resource1");
                synchronized (resource1) {
                    System.out.println(Thread.currentThread() + "get resource1");
                }
            }
        }, "线程 2").start();
    }
}

避免死锁 (cont’d)

  • AA-Deadlock

    • AA 型的死锁容易检测,及早报告,及早修复

    • spinlock-xv6.c 中的各种防御性编程

      • if (holding(lk)) panic(); $\Longrightarrow$ 如果这把🔒在上🔒的时候,线程已经得到了🔒,直接就panic(),然后crush。防止出现死🔒之后自己到处找,这种编程模式能够迅速帮你定位自己的缺陷和bug。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// Mutual exclusion spin locks.

#include "types.h"
#include "param.h"
#include "memlayout.h"
#include "spinlock.h"
#include "riscv.h"
#include "proc.h"
#include "defs.h"

void
initlock(struct spinlock *lk, char *name)
{
  lk->name = name;
  lk->locked = 0;
  lk->cpu = 0;
}

// Acquire the lock.
// Loops (spins) until the lock is acquired.
void
acquire(struct spinlock *lk)
{
  push_off(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // On RISC-V, sync_lock_test_and_set turns into an atomic swap:
  //   a5 = 1
  //   s1 = &lk->locked
  //   amoswap.w.aq a5, a5, (s1)
  while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen strictly after the lock is acquired.
  // On RISC-V, this emits a fence instruction.
  __sync_synchronize();

  // Record info about lock acquisition for holding() and debugging.
  lk->cpu = mycpu();
}

// Release the lock.
void
release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");

  lk->cpu = 0;

  // Tell the C compiler and the CPU to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other CPUs before the lock is released,
  // and that loads in the critical section occur strictly before
  // the lock is released.
  // On RISC-V, this emits a fence instruction.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code doesn't use a C assignment, since the C standard
  // implies that an assignment might be implemented with
  // multiple store instructions.
  // On RISC-V, sync_lock_release turns into an atomic swap:
  //   s1 = &lk->locked
  //   amoswap.w zero, zero, (s1)
  __sync_lock_release(&lk->locked);

  pop_off();
}

// Check whether this cpu is holding the lock.
// Interrupts must be off.
int
holding(struct spinlock *lk)
{
  int r;
  r = (lk->locked && lk->cpu == mycpu());
  return r;
}

// push_off/pop_off are like intr_off()/intr_on() except that they are matched:
// it takes two pop_off()s to undo two push_off()s.  Also, if interrupts
// are initially off, then push_off, pop_off leaves them off.

void
push_off(void)
{
  int old = intr_get();

  intr_off();
  if(mycpu()->noff == 0)
    mycpu()->intena = old;
  mycpu()->noff += 1;
}

void
pop_off(void)
{
  struct cpu *c = mycpu();
  if(intr_get())
    panic("pop_off - interruptible");
  if(c->noff < 1)
    panic("pop_off");
  c->noff -= 1;
  if(c->noff == 0 && c->intena)
    intr_on();
}
  • ABBA-Deadlock

    • 任意时刻系统中的锁都是有限的

    • 严格按照固定的顺序获得所有锁 (lock ordering; 消除 “循环等待”)

      • 比如说有X,A,B,C四把🔒,无论如何都要给这四把🔒排一下顺序

    • 任何一个线程想要获得其中的任何几把🔒,都必须要按照排好的顺序来

    • 1
      2
      3
      4
      5
      6
      7
      
      lock(A);
      lock(C);
      //------> √ <(^-^)>
      
      lock(C);
      lock(A);
      //-----> × 
      
      • 遇事不决可视化:lock-ordering.py

      • 进而证明$@thread-T_1:A \rightarrow B \rightarrow C;@thread-T_2:B \rightarrow C$是安全的

        • “在任意时刻总是有获得 “最靠后” 锁的可以继续执行”
          • 即使有许多线程的情况下,总会有一个跑的最快的线程(也就是🔒的位置最靠右/编号最大,只有🔒最大的线程才能获得下一把🔒 $\Longrightarrow$ 即使后面的线程都卡死了,最前面的线程仍然“畅通无阻“。最大的线程走进临界区以后,它就会把所有的🔒都给释放掉。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class LockOrdering:
    locks = [ '', '', '' ]

    def tryacquire(self, lk):
        self.locks[lk], seen = '🔒', self.locks[lk]
        return seen == ''

    def release(self, lk):
        self.locks[lk] = ''

    @thread
    def t1(self):
        while True:
            while not self.tryacquire(0): pass
            while not self.tryacquire(1): pass
            while not self.tryacquire(2): pass
            self.release(0), self.release(1), self.release(2)

    @thread
    def t2(self):
        while True:
            while not self.tryacquire(1): pass
            while not self.tryacquire(2): pass
            self.release(1), self.release(2)

    @marker
    def mark_negative(self, state):
        pass

  • 总结:两条技术
    1. 防御编程 $\Longrightarrow$ AA-Deadlock
    2. lock-order $\Longrightarrow$ ABBA-Deadlock

八股补充

  • 死锁四大必要条件上面都已经列出来了,很显然,只要破坏四个必要条件中的任何一个就能够预防死锁的发生

  • 破坏第一个条件互斥条件:使得资源是可以同时访问的,这是种简单的方法,磁盘就可以用这种方法管理,但是我们要知道,有很多资源往往是不能同时访问的所以这种做法在大多数的场合是行不通的

  • 破坏第三个条件非抢占加入了优先级,可以剥夺互斥资源):也就是说可以采用剥夺式调度算法,但剥夺式调度方法目前一般仅适用于主存资源处理器资源的分配,并不适用于所有的资源,会导致资源利用率下降

策略
  • 考虑破坏第二个条件和第四个条件
  1. 静态分配策略

静态分配策略可以破坏死锁产生的第二个条件(占有并等待)。

所谓静态分配策略,就是指一个进程必须在执行前就申请到它所需要的全部资源,并且知道它所要的资源都得到满足之后才开始执行。进程要么占有所有的资源然后开始执行,要么不占有资源,不会出现占有一些资源等待一些资源的情况。(类似于哲学家吃饭的问题里面,一次性拿左右手餐具)

静态分配策略逻辑简单,实现也很容易,但这种策略严重地降低了资源利用率。$\Longrightarrow$ 因为在每个进程所占有的资源中,有些资源是在比较靠后的执行时间里采用的,甚至有些资源是在额外的情况下才使用的,这样就可能造成一个进程占有了一些几乎不用的资源而使其他需要该资源的进程产生等待的情况(没有拿全资源,占着拿到的资源不干活)

  1. 层次分配策略

层次分配策略破坏了产生死锁的第四个条件(循环等待)。

在层次分配策略下,所有的资源被分成了多个层次,一个进程得到某一次的一个资源后,它只能再申请较高一层的资源;当一个进程要释放某层的一个资源时,必须先释放所占用的较高层的资源,按这种策略,是不可能出现循环等待链的,因为那样的话,就出现了已经申请了较高层的资源,反而去申请了较低层的资源,不符合层次分配策略。参考避免死锁 (cont’d)

Lock Ordering: 应用 (Linux Kernel: rmap.c)

/img/Operating System/chapter8-5.png
Linux Kernel:rmap.c中的Lock Ordering

但是……你又 Naive 了……

Textbooks will tell you that if you always lock in the same order, you will never get this kind of deadlock. Practice will tell you that this approach doesn’t scale: when I create a new lock, I don’t understand enough of the kernel to figure out where in the 5000 lock hierarchy it will fit.

The best locks are encapsulated: they never get exposed in headers, and are never held around calls to non-trivial functions outside the same file. You can read through this code and see that it will never deadlock, because it never tries to grab another lock while it has that one. People using your code don’t even need to know you are using a lock.

—— Unreliable Guide to Locking by Rusty Russell

  • 我们稍后回到这个问题,继续看更多的 bugs

补充:银行家算法

/img/Operating System/chapter8-11.png
算法实现
/img/Operating System/chapter8-11.png
其他补充

银行家算法详情可见:《一句话+一张图说清楚——银行家算法》

并发 Bug:数据竞争 (Data Race) $\Longrightarrow$ 不上锁不就没有死锁了吗?

数据竞争

不同的线程同时访问同一段内存,且至少有一个是写

  • 两个内存访问在 “赛跑”,“跑赢” 的操作先执行 $\Longrightarrow$ 程序最终的结果与谁跑赢有关 $\Longrightarrow$ 计组中和的“读后写”与“写后读”有关的流水线排序问题

数据竞争 (cont’d)

  • Peterson 算法告诉大家:

    • 你们写不对无锁的并发程序

    • 所以事情反而简单了

用互斥锁保护好共享数据

消灭一切数据竞争

数据竞争:例子

  • 以下代码概括了你们遇到数据竞争的大部分情况
    • 不要笑,你们的 bug 几乎都是这两种情况的变种
1
2
3
// Case #1: 上错了锁
void thread1() { spin_lock(&lk1); sum++; spin_unlock(&lk1); }
void thread2() { spin_lock(&lk2); sum++; spin_unlock(&lk2); }
1
2
3
// Case #2: 忘记上锁
void thread1() { spin_lock(&lk1); sum++; spin_unlock(&lk1); }
void thread2() { sum++; }

更多类型的并发 Bug

程序员:花式犯错

  • 回顾我们实现并发控制的工具

    • 互斥锁 (lock/unlock) - 原子性

    • 条件变量 (wait/signal) - 同步

  • 忘记上锁——原子性违反 (Atomicity Violation, AV)

  • 忘记同步——顺序违反 (Order Violation, OV)

  • Empirical study: 在 105 个并发 bug 中 (non-deadlock/deadlock)

    • MySQL (14/9), Apache (13/4), Mozilla (41/16), OpenOffice (6/2)

    • 97% 的非死锁并发 bug 都是 AV 或 OV

原子性违反 (AV)

  • “ABA”
    • 我以为一段代码没啥事呢,但被人强势插入了
/img/Operating System/chapter8-6.png
ABA

原子性违反 (cont’d)

  • 有时候上锁也不解决问题
    • “TOCTTOU” - time of check to time of use
/img/Operating System/chapter8-7.png
TOCTTOU

顺序违反 (OV)

  • “BA”
    • 怎么就没按我预想的顺序来呢?
      • 例子:concurrent use after free
/img/Operating System/chapter8-8.png
BA

应对并发 Bug 的方法

完全一样的基本思路:否定你自己

还是得始终假设自己的代码是错的

  • 然后呢?

    • 做好测试

    • 检查哪里错了

    • 再检查哪里错了

    • 再再检查哪里错了

      • (把任何你认为 “不对” 的情况都检查一遍)
  • 例如:用 lock ordering 彻底避免死锁?

    • 你想多了:并发那么复杂,程序员哪能充分测试啊

Lockdep: 运行时的死锁检查

Lockdep 规约 (Specification)

  • 为每一个锁确定唯一的 “allocation site”
    • lock-site.c
    • assert: 同一个 allocation site 的锁存在全局唯一的上锁顺序
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

typedef struct lock {
  int locked;
  const char *site;
} lock_t;

#define STRINGIFY(s) #s
#define TOSTRING(s)  STRINGIFY(s)
#define LOCK_INIT() \
  ( (lock_t) { .locked = 0, .site = __FILE__ ":" TOSTRING(__LINE__), } )

lock_t lk1 = LOCK_INIT();
lock_t lk2 = LOCK_INIT();

void lock(lock_t *lk) {
  printf("LOCK   %s\n", lk->site);
}

void unlock(lock_t *lk) {
  printf("UNLOCK %s\n", lk->site);
}

struct some_object {
  lock_t lock;
  int data;
};

void object_init(struct some_object *obj) {
  obj->lock = LOCK_INIT();
}

int main() {
  lock(&lk1);
  lock(&lk2);
  unlock(&lk1);
  unlock(&lk2);

  struct some_object *obj = malloc(sizeof(struct some_object));
  assert(obj);
  object_init(obj);
  lock(&obj->lock);

  lock(&lk2);
  lock(&lk1);
}
/img/Operating System/chapter8-9.png
ChatGPT
  • 检查方法:printf
    • 记录所有观察到的上锁顺序,例如

$$ [x,y,z] \Longrightarrow x \rightarrow y,x\rightarrow z,y\rightarrow z $$

  • 检查是否存在$x ⇝ y ∧ y ⇝ x$

  • 维护一个图,图里面不能有环(有环就违反了Lock-Order)

  • Lockdep 的实现

ThreadSanitizer: 运行时的数据竞争检查

  • 为所有事件建立 happens-before 关系图 $\Longrightarrow$ 图论

    • Program-order + release-acquire

    • 对于发生在不同线程且至少有一个是写的$x,y$检查

$$ x ≺ y ∨ y ≺ x $$

更多的检查:动态程序分析

  • 在事件发生时记录

    • Lockdep: lock/unlock

    • ThreadSanitizer: 内存访问 + lock/unlock

  • 解析记录检查问题

    • Lockdep: $x ⇝ y ∧ y ⇝ x$
    • ThreadSanitizer: $x \nprec y ∧ y \nprec x$
  • 付出的代价和权衡

    • 程序执行变慢

    • 但更容易找到 bug (因此在测试环境中常用)

动态分析工具:Sanitizers

1
2
3
4
5
6
7
8
9
#include <stdlib.h>

int main() {
    int *p = malloc(sizeof(int));
    *p = 1;
    free(p);
    *p = 1;
    return 0;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
gcc -g test.c -fsanitize=address -o test && ./test
=================================================================
==37384==ERROR: AddressSanitizer: heap-use-after-free on address 0x602000000010 at pc 0x55c0a903b213 bp 0x7ffe5d50fcd0 sp 0x7ffe5d50fcc8
WRITE of size 4 at 0x602000000010 thread T0
    #0 0x55c0a903b212 in main /home/kali/chapter7/test.c:7
    #1 0x7f8416046189 in __libc_start_call_main ../sysdeps/nptl/libc_start_call_main.h:58
    #2 0x7f8416046244 in __libc_start_main_impl ../csu/libc-start.c:381
    #3 0x55c0a903b0b0 in _start (/home/kali/chapter7/test+0x10b0)

0x602000000010 is located 0 bytes inside of 4-byte region [0x602000000010,0x602000000014)
freed by thread T0 here:
    #0 0x7f84162b76a8 in __interceptor_free ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:52
    #1 0x55c0a903b1db in main /home/kali/chapter7/test.c:6
    #2 0x7f8416046189 in __libc_start_call_main ../sysdeps/nptl/libc_start_call_main.h:58

previously allocated by thread T0 here:
    #0 0x7f84162b89cf in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:69
    #1 0x55c0a903b18a in main /home/kali/chapter7/test.c:4
    #2 0x7f8416046189 in __libc_start_call_main ../sysdeps/nptl/libc_start_call_main.h:58

SUMMARY: AddressSanitizer: heap-use-after-free /home/kali/chapter7/test.c:7 in main
Shadow bytes around the buggy address:
  0x0c047fff7fb0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c047fff7fc0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c047fff7fd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c047fff7fe0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0c047fff7ff0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
=>0x0c047fff8000: fa fa[fd]fa fa fa fa fa fa fa fa fa fa fa fa fa
  0x0c047fff8010: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
  0x0c047fff8020: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
  0x0c047fff8030: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
  0x0c047fff8040: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
  0x0c047fff8050: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
Shadow byte legend (one shadow byte represents 8 application bytes):
  Addressable:           00
  Partially addressable: 01 02 03 04 05 06 07 
  Heap left redzone:       fa
  Freed heap region:       fd
  Stack left redzone:      f1
  Stack mid redzone:       f2
  Stack right redzone:     f3
  Stack after return:      f5
  Stack use after scope:   f8
  Global redzone:          f9
  Global init order:       f6
  Poisoned by user:        f7
  Container overflow:      fc
  Array cookie:            ac
  Intra object redzone:    bb
  ASan internal:           fe
  Left alloca redzone:     ca
  Right alloca redzone:    cb
==37384==ABORTING
/img/Operating System/chapter8-10.png
检查sum.c的thread

这不就是防御性编程吗?

  • 只不过不需要我亲自动手把代码改得乱七八糟了……

我们也可以!Buffer Overrun 检查

  • Canary (金丝雀) 对一氧化碳非常敏感

    • 用生命预警矿井下的瓦斯泄露 (since 1911)
  • 计算机系统中的 canary

    • “牺牲” 一些内存单元,来预警 memory error 的发生
      • (程序运行时没有动物受到实质的伤害)

Canary 的例子:保护栈空间 (M2/L2)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#define MAGIC 0x55555555
#define BOTTOM (STK_SZ / sizeof(u32) - 1)
struct stack { char data[STK_SZ]; };

void canary_init(struct stack *s) {
  u32 *ptr = (u32 *)s;
  for (int i = 0; i < CANARY_SZ; i++)
    ptr[BOTTOM - i] = ptr[i] = MAGIC;
}

void canary_check(struct stack *s) {
  u32 *ptr = (u32 *)s;
  for (int i = 0; i < CANARY_SZ; i++) {
    panic_on(ptr[BOTTOM - i] != MAGIC, "underflow");
    panic_on(ptr[i] != MAGIC, "overflow");
  }
}

烫烫烫、屯屯屯和葺葺葺

  • msvc 中 debug mode 的 guard/fence/canary

    • 未初始化栈: 0xcccccccc

    • 未初始化堆: 0xcdcdcdcd

    • 对象头尾: 0xfdfdfdfd

    • 已回收内存: 0xdddddddd

  • (b'**\xcc**' * 80).decode('gb2312')

手持两把锟斤拷,口中疾呼烫烫烫

脚踏千朵屯屯屯,笑看万物锘锘锘

(它们一直在无形中保护你)

防御性编程:低配版 Lockdep

  • 不必大费周章记录什么上锁顺序
    • 统计当前的 spin count
      • 如果超过某个明显不正常的数值 (1,000,000,000) 就报告
1
2
3
4
5
6
int spin_cnt = 0;
while (xchg(&locked, 1)) {
  if (spin_cnt++ > SPIN_LIMIT) {
    printf("Too many spin @ %s:%d\n", __FILE__, __LINE__);
  }
}
  • 配合调试器和线程 backtrace 一秒诊断死锁

防御性编程:低配版 Sanitizer (L1)

  • 内存分配要求malloc and free:已分配内存$S=[l_0,r_0) \cup [l_1,r_1) \cup …$
    • $kalloc(S)$返回的$[l,r)$ 必须满足$[l,r) \cup S = \empty$
      • thread-local allocation + 并发的 free 还蛮容易弄错的
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// allocation
for (int i = 0; (i + 1) * sizeof(u32) <= size; i++) {
  panic_on(((u32 *)ptr)[i] == MAGIC, "double-allocation");//分配的时候看到了malloc颜色
  arr[i] = MAGIC; //刷上malloc颜色
}

// free
for (int i = 0; (i + 1) * sizeof(u32) <= alloc_size(ptr); i++) {
  panic_on(((u32 *)ptr)[i] == 0, "double-free");//分配的时候看到了free颜色
  arr[i] = 0; //刷上free颜色
}

补充:死锁检测 | 进程-资源分配图

  • 操作系统中的每一刻时刻的系统状态都可以用进程-资源分配图来表示,进程-资源分配图是描述进程和资源申请及分配关系的一种有向图,可用于检测系统是否处于死锁状态

  • 用一个方框表示每一个资源类,方框中的黑点表示该资源类中的各个资源,每个键进程用一个圆圈表示,有向边来表示进程申请资源和资源被分配的情况

  • 进程-资源分配图中存在环路并不一定是发生了死锁。因为循环等待资源仅仅是死锁发生的必要条件,而不是充分条件

  • 死锁检测步骤

  1. 如果进程-资源分配图中无环路,则此时系统没有发生死锁
  2. 如果进程-资源分配图中有环路,且每个资源类仅有一个资源,则系统中已经发生了死锁
  3. 如果进程-资源分配图中有环路,且涉及到的资源类有多个资源,此时系统未必会发生死锁。如果能在进程-资源分配图中找出一个既不阻塞又非独立的进程 ,该进程能够在有限的时间内归还占有的资源,也就是把边给消除掉了,重复此过程,直到能在有限的时间内消除所有的边 ,则不会发生死锁,否则会发生死锁。(消除边的过程类似于拓扑排序)
  • 死锁的解除

当死锁检测程序检测到存在死锁发生时,应设法让其解除,让系统从死锁状态中恢复过来,常用的解除死锁的方法有以下四种:

  1. 立即结束所有进程的执行,重新启动操作系统 :这种方法简单,但以前所在的工作全部作废,损失很大。
  2. 撤销涉及死锁的所有进程,解除死锁后继续运行 :这种方法能彻底打破死锁的循环等待条件,但将付出很大代价,例如有些进程可能已经计算了很长时间,由于被撤销而使产生的部分结果也被消除了,再重新执行时还要再次进行计算。
  3. 逐个撤销涉及死锁的进程,回收其资源直至死锁解除。
  4. 抢占资源 :从涉及死锁的一个或几个进程中抢占资源,把夺得的资源再分配给涉及死锁的进程直至死锁解除

总结

本次课回答的问题

  • Q: 如何拯救人类不擅长的并发编程?

Take-away message

  • 常见的并发 bug
    • 死锁、数据竞争、原子性/顺序违反
  • 不要盲目相信自己:检查、检查、检查
    • 防御性编程:检查
    • 动态分析:打印 + 检查

声明:本文章引用资料与图像均已做标注,如有侵权本人会马上删除