目录

Operating System Chapter4 理解并发程序执行


Operating System

$Nanjing\ University\rightarrow Yanyan\ Jiang\newline$

Update

晚上看的,Peterson算法好™️的👨,证明等过两天有时间给看了

理解并发程序执行

复习

  • 一般程序执行

线程的栈帧会用一个list来存放

  • 多线程

T1,T2随机来回转换执行

  • 并发程序 = 多个执行流、共享内存的状态机

画状态机理解并发程序

互斥:保证两个线程不能同时执行一段代码

  • 插入 “神秘代码”,使得 sum.c (或者任意其他代码) 能够正常工作
1
2
3
4
5
void Tsum() {
  // 神秘代码
  sum++;
  // 神秘代码
}
  • 可以通过__sync_synchronize();来保证原子操作
1
2
3
__sync_synchronize();
x = 1; // 或 int t = x;
__sync_synchronize();

__sync_synchronize();是GCC内置函数的一种,用于在编写多线程程序时确保在内存操作之前和之后的指令都不会被重排。

具体来说,这个函数是一个内存栅栏,用于告诉编译器不要把本条指令前面和后面的内存操作顺序交换,也就是防止编译器进行指令重排,保证在这个函数之前和之后的内存操作按照代码中的顺序执行。

这个函数在实现多线程锁、原子操作等场景中经常被使用,以确保线程间的同步和一致性。

失败的尝试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
int locked = UNLOCK;

void critical_section() {
retry:
  if (locked != UNLOCK) {
    goto retry;
  }
  locked = LOCK;

  // critical section

  locked = UNLOCK;
}
  • 原因

    • 看到的状态到真正做下一件事之间的状态是否被人改了?(看到的东西仅仅只是一个历史,离做还有几个周期,而做是根据这个“历史”的状态来决定的)$\Longrightarrow$ 和人眼看着东西对比,人眼是一直在看的,而CPU并行执行程序的时候,看这个指令执行完了之后可能CPU会去执行另一个线程的几条指令,这种情况下相当于这个人看完后闭上眼睛等了几秒,然后根据几秒前所看的东西来判断自己要干啥,可是这个东西在前几秒可能已经让另一个线程的某些指令改过了
    • 处理器默认不保证 load + store 的原子性(单操作可以保证)
  • 一种失败的情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "thread.h"

#define N 100000000

#define LOCK 1
#define UNLOCK 0

long sum = 0;

int locked = UNLOCK;

void Tsum() {
retry:  //T2在这个时候也执行到了locked != UNLOCK的判断,而lock = LOCK,T2也进去了,没锁住!
  if (locked != UNLOCK) { //<--------------------------------
    goto retry;                                        //  |
  }                                                    //  |  
  locked = LOCK;       //T1执行到了这里 此时locked = UNLOCK-|
  for (int i = 0; i < N; i++) {
    sum++;
  }
  locked = UNLOCK;
}

int main() {
  create(Tsum);
  create(Tsum);
  join();
  printf("sum = %ld\n", sum);
}

正确性不明的奇怪尝试 (Peterson 算法)

  • A 和 B 争用厕所的包厢

  • 想进入包厢之前,A/B 都要先举起自己的旗子

    • A 确认旗子举好以后,往厕所门上贴上 “B 正在使用” 的标签
    • B 确认旗子举好以后,往厕所门上贴上 “A 正在使用” 的标签
  • 然后如果对方的旗子举起来,且门上的名字不是自己,等待

    • 否则可以进入包厢
  • 出包厢后,放下自己的旗子

  • 示例代码:peterson-simple.c

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include "thread.h"

#define A 1
#define B 2

atomic_int nested;
atomic_long count;

void critical_section()
{
  long cnt = atomic_fetch_add(&count, 1);
  assert(atomic_fetch_add(&nested, 1) == 0);
  atomic_fetch_add(&nested, -1);
}

int volatile x = 0, y = 0, turn = A;

void TA()
{
  while (1)
  {
    /* PC=1 */ x = 1;
    /* PC=2 */ turn = B;
    /* PC=3 */ while (y && turn == B)
      ;
    critical_section();
    /* PC=4 */ x = 0;
  }
}

void TB()
{
  while (1)
  {
    /* PC=1 */ y = 1;
    /* PC=2 */ turn = A;
    /* PC=3 */ while (x && turn == A)
      ;
    critical_section();
    /* PC=4 */ y = 0;
  }
}

int main()
{
  create(TA);
  create(TB);
}

有关上面的原子操作:

这段代码定义了两个原子变量 nestedcount,并实现了一个临界区 critical_section()

critical_section() 中,首先通过调用 atomic_fetch_add(&count, 1)count 原子变量的值加 1 并返回当前的值,保存在 cnt 中。

然后通过调用 atomic_fetch_add(&nested, 1)nested 原子变量的值加 1,同时检查原子变量 nested 是否为 0。由于 nested 的初始值为 0,因此这里可以通过 assert 断言来验证。

最后通过调用 atomic_fetch_add(&nested, -1)nested 原子变量的值减 1。

整个 critical_section() 实现了一个简单的临界区,其中 nested 原子变量用于保证临界区同时只能被一个线程访问,而 count 原子变量用于记录临界区的进入次数。


在给 nested 这个 atomic_int 对象赋初值为 0 的情况下,断言 assert(atomic_fetch_add(&nested, 1) == 0) 是可以通过的。

这是因为 atomic_fetch_add() 函数是原子的,它会将 nested 的值加 1,并返回增加前的值。在这个代码中,nested 初始值为 0,然后通过 atomic_fetch_add(&nested, 1) 将其增加为 1,并返回增加前的值 0。因此,断言 assert(atomic_fetch_add(&nested, 1) == 0) 会成功通过,因为 atomic_fetch_add() 返回的值与断言中的比较值相等。

需要注意的是,atomic_int 是 C++ 标准库提供的原子类型,用于在多线程环境下进行原子操作,确保线程安全性。在多线程环境下,原子操作是不会被中断的,因此可以保证 atomic_int 类型对象的操作不会发生竞态条件等问题。但仍然需要谨慎使用,并根据具体情况考虑是否需要使用其他同步机制,如互斥锁、条件变量等。

Peterson’s Protocol Verified 🎖

Update

原来证明和判断可以用状态🌳来证明(和全数学相比可能不严谨,但是从CS的角度来说够用了)

先😴,回来再看

我们 (在完全不理解算法的前提下) 证明了 Sequential 内存模型下 Peterson’s Protocol 的 Safety。它能够实现互斥。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Dekker:
    flag = [False, False]
    turn = 0

    @thread
    def t1(self):
        this, another = 0, 1
        while True:
            self.flag[this] = True
            while self.flag[another]:
                if self.turn == another:
                    self.flag[this] = False
                    while self.turn == another:
                        pass
                    self.flag[this] = True
            cs = True
            del cs
            self.turn = another
            self.flag[this] = False
  

    @thread
    def t2(self):
        this, another = 1, 0
        while True:
            self.flag[this] = True
            while self.flag[another]:
                if self.turn == another:
                    self.flag[this] = False
                    while self.turn == another:
                        pass
                    self.flag[this] = True
            cs = True
            del cs
            self.turn = another
            self.flag[this] = False

    @marker
    def mark_t1(self, state):
        if localvar(state, 't1', 'cs'): return 'blue'

    @marker
    def mark_t2(self, state):
        if localvar(state, 't2', 'cs'): return 'green'

    @marker
    def mark_both(self, state):
        if localvar(state, 't1', 'cs') and localvar(state, 't2', 'cs'):
            return 'red'
  • 一些现状
    • 今天有非常坚 (内) 实 (卷) 的理论体系
    • 小心编译器和多处理器硬件(编译器优化,CPU指令流水线优化)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "thread.h"

#define A 1
#define B 2

#define BARRIER __sync_synchronize()

atomic_int nested;
atomic_long count;

void critical_section() {
  long cnt = atomic_fetch_add(&count, 1);
  int i = atomic_fetch_add(&nested, 1) + 1;
  if (i != 1) {
    printf("%d threads in the critical section @ count=%ld\n", i, cnt);
    assert(0);
  }
  atomic_fetch_add(&nested, -1);
}

int volatile x = 0, y = 0, turn;

void TA() {
  while (1) {
    x = 1;                   BARRIER;
    turn = B;                BARRIER; // <- this is critcal for x86
    while (1) {
      if (!y) break;         BARRIER;
      if (turn != B) break;  BARRIER;
    }
    critical_section();
    x = 0;                   BARRIER;
  }
}

void TB() {
  while (1) {
    y = 1;                   BARRIER;
    turn = A;                BARRIER;
    while (1) {
      if (!x) break;         BARRIER;
      if (turn != A) break;  BARRIER;
    }
    critical_section();
    y = 0;                   BARRIER;
  }
}

int main() {
  create(TA);
  create(TB);
}
  • 课后思考:哪些 barrier 是多余的吗?
1
#define BARRIER __sync_synchronize() //重点研究这个宏的作用

(自动) 画状态机理解并发程序

Update

体育课累歇逼了,晚上给看完

  • 并发算法的设计困境

    • 不敢不画:谁知道有什么奇怪情况会发生?

    • 不敢乱画:画错了就都完了

  • 解决方法

    • 让电脑帮我们画(因为画状态机就是一个机械的事情)

model-checker.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import inspect, ast, astor, copy, sys
from pathlib import Path

threads, marker_fn = [], []

def thread(fn):
    '''Decorate a member function as a thread'''
    global threads
    threads.append(fn.__name__)
    return fn

def marker(fn):
    '''Decorate a member function as a state marker'''
    global marker_fn
    marker_fn.append(fn)

def localvar(s, t, varname):
    '''Return local variable value of thread t in state s'''
    return s.get(t, (0, {}))[1].get(varname, None)

def checkpoint():
    '''Instrumented `yield checkpoint()` goes here'''
    f = inspect.stack()[1].frame # stack[1] is the caller of checkpoint()
    return (f.f_lineno, { k: v for k, v in f.f_locals.items() if k != 'self' })

def hack(Class):
    '''Hack Class to instrument @mc.thread functions'''
    class Instrument(ast.NodeTransformer):
        def generic_visit(self, node, in_fn=False):
            if isinstance(node, ast.FunctionDef):
                if node.name in threads:
                    # a @mc.thread function -> instrument it
                    in_fn, node.decorator_list = True, []
                elif node.decorator_list:
                    # a decorated function like @mc.mark -> remove it
                    return None

            body = []
            for line in getattr(node, 'body', []):
                # prepend each line with `yield checkpoint()`
                if in_fn: body.append(
                    ast.Expr(ast.Yield(
                        ast.Call(func=ast.Name(checkpoint.__name__, ctx=ast.Load()),
                            args=[], keywords=[]))) )
                body.append(self.generic_visit(line, in_fn))
            node.body = body
            return node

    if not hasattr(Class, 'hacked'):
        hacked_ast = Instrument().visit(ast.parse(Class.source))
        hacked_src, vars = astor.to_source(hacked_ast), {}
        # set a breakpoint() here to see **magic happens**!
        exec(hacked_src, globals(), vars)
        Class.hacked, Class.hacked_src = vars[Class.__name__], hacked_src
    return Class

def execute(Class, trace):
    '''Execute trace (like [0,0,0,2,2,1,1,1]) on Class'''
    def attrs(obj):
        for attr in dir(obj):
            val = getattr(obj, attr)
            if not attr.startswith('__') and type(val) in [bool, int, str, list, tuple, dict]:
                yield attr, val

    obj = hack(Class).hacked()
    for attr, val in attrs(obj):
        setattr(obj, attr, copy.deepcopy(val))
 
    T = []
    for t in threads:
        fn = getattr(obj, t)
        T.append(fn()) # a generator for a thread
    S = { t: T[i].__next__() for i, t in enumerate(threads) }

    while trace:
        chosen, tname, trace = trace[0], threads[trace[0]], trace[1:]
        try:
            if T[chosen]:
                S[tname] = T[chosen].__next__()
        except StopIteration:
            S.pop(tname)
            T[chosen] = None

    for attr, val in attrs(obj):
        S[attr] = val
    return obj, S

class State:
    def __init__(self, Class, trace):
        self.trace = trace
        self.obj, self.state = execute(Class, trace)
        self.name = f's{abs(State.freeze(self.state).__hash__())}'

    @staticmethod
    def freeze(obj):
        '''Create an object's hashable frozen (immutable) counterpart'''
        if obj is None or type(obj) in [str, int, bool]:
            return obj
        elif type(obj) in [list, tuple]:
            return tuple(State.freeze(x) for x in obj)
        elif type(obj) in [dict]:
            return tuple(sorted(
                zip(obj.keys(), (State.freeze(v) for v in obj.values()))
            ))
        raise ValueError('Cannot freeze')

def serialize(Class, s0, vertices, edges):
    '''Serialize all model checking results'''
    print(f'CLASS({repr(Class.hacked_src)})')

    sid = { s0.name: 0 }
    def name(s):
        if s.name not in sid: 
            sid[s.name] = len(sid)
        return repr(f's{sid[s.name]}')

    for u in vertices.values():
        mk = [f(u.obj, u.state) for f in marker_fn if f(u.obj, u.state)]
        print(f'STATE({name(u)}, {repr(u.state)}, {repr(mk)})')

    for u, v, chosen in edges:
        print(f'TRANS({name(u)}, {name(v)}, {repr(threads[chosen])})')

def check_bfs(Class):
    '''Enumerate all possible thread interleavings of @mc.thread functions'''
    s0 = State(Class, trace=[])

    # breadth-first search to find all possible thread interleavings
    queue, vertices, edges = [s0], {s0.name: s0}, []
    while queue:
        u, queue = queue[0], queue[1:]
        for chosen, _ in enumerate(threads):
            v = State(Class, u.trace + [chosen])
            if v.name not in vertices:
                queue.append(v)
                vertices[v.name] = v
            edges.append((u, v, chosen))

    serialize(Class, s0, vertices, edges)

src, vars = Path(sys.argv[1]).read_text(), {}
exec(src, globals(), vars)
Class = [C for C in vars.values() if type(C) == type].pop()
setattr(Class, 'source', src)
check_bfs(Class)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Mutex:
    locked = ''

    @thread
    def t1(self):
        while True:
            while self.locked == '🔒':
                pass
            self.locked = '🔒'
            cs = True
            del cs
            self.locked = ''

    @thread
    def t2(self):
        while True:
            while self.locked == '🔒':
                pass
            self.locked = '🔒'
            cs = True
            del cs
            self.locked = ''

    @marker
    def mark_t1(self, state):
        if localvar(state, 't1', 'cs'): return 'blue'

    @marker
    def mark_t2(self, state):
        if localvar(state, 't2', 'cs'): return 'green'

    @marker
    def mark_both(self, state):
        if localvar(state, 't1', 'cs') and localvar(state, 't2', 'cs'):
            return 'red'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Peterson:
    flag = '  '
    turn = ' '

    @thread
    def t1(self):
        while True:
            self.flag = '🏴' + self.flag[1]
            self.turn = '🏳'
            while self.flag[1] != ' ' and self.turn == '🏳':
                pass
            cs = True
            del cs
            self.flag = ' ' + self.flag[1]

    @thread
    def t2(self):
        while True:
            self.flag = self.flag[0] + '🏳'
            self.turn = '🏴'
            while self.flag[0] != ' ' and self.turn == '🏴':
                pass
            cs = True
            del cs
            self.flag = self.flag[0] + ' '

    @marker
    def mark_t1(self, state):
        if localvar(state, 't1', 'cs'): return 'blue'

    @marker
    def mark_t2(self, state):
        if localvar(state, 't2', 'cs'): return 'green'

    @marker
    def mark_both(self, state):
        if localvar(state, 't1', 'cs') and localvar(state, 't2', 'cs'):
            return 'red'
  • 使用例
1
python3 model-checker.py peterson-flag.py
  • 后面跟着model的参数脚本即可

  • 为什么输出模式这么反人类 $\Longrightarrow$ 因为输出是给程序看的,后面的python脚本可以写一些lambda表达式来提取信息 $\Longrightarrow$ 加到后期的工具visualize.py,直接可视化(很像之前zweix大佬的jyyslide-md)的markdown转html的感觉很像

  • 安装相关模块,除了pip的以外Linux的机器上要安装graphviz

1
2
sudo apt install graphviz
pip install graphviz
  • 然后通过管道通信和重定向
1
2
python3 model-checker.py peterson-flag.py | python3 visualize.py > a.html
open a.html

一个checker的demo

代码导读:Python Generator

死循环也能返回?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def numbers(init=0, step=1):
    n = init
    while True:
        n += step
        yield n
>>> g = numbers()
>>> g
<generator object numbers at 0x107f873c0>
>>> g.__next__()
1
>>> g.__next__()
2
  • yield让死循环返回,但又不是完全返回
/img/Operating System/chapter4-1.png
效果 $\Longrightarrow$ g就是一个状态机

Model Checker: 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Mutex:
    locked = ''

    def T1(self):
        yield checkpoint()
        while True:
            yield checkpoint()
            while self.locked == '🔒':
                yield checkpoint()
                pass
            yield checkpoint()
            self.locked = '🔒'
            ...
1
2
thread_state = mutex_obj().T1() 
thread_state.__next__() # 单步执行一行; see: execute()

Model Checker: 实现 (cont’d)

什么是状态空间?

  • 所有可能的状态机执行序列
  • BFS 生成,合并重复状态
1
2
3
4
5
6
7
8
[0]      T1
[1]      T2
[0,0]    T1 -> T1
[0,1]    T1 -> T2
[0,0,0]  T1 -> T1 -> T1
[0,0,1]  T1 -> T1 -> T2
[0,1,0]  T1 -> T2 -> T1
...      ...

Model Checking 和工具的故事

Model checking is a method for formally verifying finite-state systems——只要能为系统建立模型,就能用 prove by brute-force 证明正确/找到错误。

Model checker 的一切就是状态机!

  • Safety: 红色的状态不可到达
    • G(V,E) 上的可达性问题
  • (Strong) Liveness: 从任意状态出发,都能到达绿/蓝色状态
    • G(V,E) 上的什么问题?
  • 如何展示这个状态机?
  • 如何能避免无效的探索?

更多的 Model Checker

真实程序的状态空间太大?


不满足于简单的内存模型?

总结

本次课回答的问题

  • Q: 如何理解各种并发程序?

Take-away message

  • 并发程序 = 状态机
    • 线程共享内存
    • 每一步非确定选择线程执行
  • 画状态机就对了
    • 当然,用工具帮你画 (model checker)

声明:本文章引用资料与图像均已做标注,如有侵权本人会马上删除