互斥与同步的概念
- 互斥:在一个进程中,有时候我们需要创建多个线程来完成任务,由于这多个线程是共享该进程的资源,那么就可能多个线程同时访问同一块资源,那么这块资源就称为“临界资源”,访问临界资源的代码称为“临界区”由于线程的调度是由操作系统决定的,所以不能确定线程的执行顺序,有可能在A线程访问临界资源的时候,对临界资源进行了修改,而此时还有一个B线程也在临界区内,但线程B不知道线程A已经修改了数据,就会造成数据二义性。互斥的意思就是在同一时刻内临界资源只能有一个线程访问,当有进程在访问临界资源时,其他进程只能等该进程访问结束后才可以访问临界资源。
- 同步:多个线程都要访问临界资源,有时候访问临界资源必须要有先后顺序,如A线程需要对临界资源进行处理,而B线程要从临界资源拿到A线程处理后的数据。那么B线程就必须等A线程访问临界资源后才可以访问。这就是同步。
基于锁实现互斥与同步
既然一次只能有一个线程访问临界资源,那我们就可以考虑给临界资源加上一把锁,每当有一个线程要开始临界资源,都要先查看锁的状态,如果锁已经处于锁住状态则表明现在有线程正在访问临界资源。当一个线程访问结束后再释放锁,这样其他线程就可以访问临界资源了。
下面介绍一中锁:互斥锁
pthread_mutex_t lock;
pthread_mutex_t是锁的类型,对锁的操作都是通过相关函数进行的。
初始化锁与销毁锁:
pthread_mutex_init();
pthread_mutex_destroy();
加锁:
pthread_mutex_lock();
解锁:
pthread_mutex_unlock();
下面实现一个互斥的例子,模拟抢票。
票务系统一次放出100张票,多个线程同时开始抢票。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t lock;
void* get_ticket(void* arg){
while(1){
if(ticket > 0){
usleep(1000);
printf("I am thread :%p, I get a ticket: %d\n",pthread_self(), ticket);
ticket--;
}
else{
break;
}
}
}
int main(){
pthread_t tid[5];
pthread_mutex_init(&lock, NULL);
int i = 0;
for(; i < 5; i++){
pthread_create(tid+i, NULL, get_ticket, (void*)&i);
}
for(i = 0; i < 5; i++){
pthread_join(tid[i], NULL);
}
pthread_mutex_destroy(&lock);
return 0;
}
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t lock;
void* get_ticket(void* arg){
while(1){
usleep(1000);
pthread_mutex_lock(&lock);
if(ticket > 0){
usleep(1000);
printf("I am thread :%p, I get a ticket: %d\n",pthread_self(), ticket);
ticket--;
}
else{
pthread_mutex_unlock(&lock);
break;
}
pthread_mutex_unlock(&lock);
}
}
int main(){
pthread_t tid[5];
pthread_mutex_init(&lock, NULL);
int i = 0;
for(; i < 5; i++){
pthread_create(tid+i, NULL, get_ticket, (void*)&i);
}
for(i = 0; i < 5; i++){
pthread_join(tid[i], NULL);
}
pthread_mutex_destroy(&lock);
return 0;
}
基于条件变量和锁实现同步
要实现同步,就要有对应的等待条件,我们使用条件变量来与锁来实现同步。
phhread_t cond
定义一个条件变量,对条件变量的操作都是通过函数来实现的。
初始化与销毁条件变量:
pthread_cond_init();
pthread_cond_destroy();
pthread_cond_destroy();
参数:要销毁的条件变量。
在某个条件变量下等待
pthread_cond_wait();
因为同步访问的也都是临界资源,一个线程发现条件不满足时开始等待,当条件满足时该线程获取到信息,同时申请锁,继续在临界区进行访问。
通知某个条件变量下等待的线程继续工作
pthread_cond_signal();
我们通过生产者消费者模型来实现同步。
基于阻塞队列的生产者与消费者模型
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
使用C++来编写代码:
model.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#define NUM 5
class BlockQueue{
public:
BlockQueue(size_t cap = NUM):_cap(cap){
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_full, NULL);
pthread_cond_init(&_empty, NULL);
}
~BlockQueue(){
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
bool isfull(){
return q.size() >= _cap;
}
bool isempty(){
return q.size() == 0;
}
void put(int& data){
pthread_mutex_lock(&_lock);
while(isfull()){
pthread_cond_signal(&_empty);
std::cout << "the queue is full... i am waiting!" << std::endl;
pthread_cond_wait(&_full, &_lock);
}
q.push(data);
pthread_mutex_unlock(&_lock);
}
void pop(){
pthread_mutex_lock(&_lock);
while(isempty()){
pthread_cond_signal(&_full);
std::cout << "the queue is empty.. i am waiting!" << std::endl;
pthread_cond_wait(&_empty, &_lock);
}
int data = q.front();
q.pop();
pthread_mutex_unlock(&_lock);
std::cout << "i got you! :" << data << " * 2 = " << data * 2 << std::endl;
}
private:
size_t _cap;
std::queue<int> q;
pthread_cond_t _full;
pthread_cond_t _empty;
pthread_mutex_t _lock;
};
model.cc
#include "model.hpp"
void* producer(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
int data = rand() % 100;
bq->put(data);
sleep(1);
}
}
void* consumer(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
bq->pop();
sleep(1);
}
}
int main(){
BlockQueue bq;
pthread_t a, b;
pthread_create(&a, NULL, producer, (void*)&bq);
pthread_create(&b, NULL, consumer, (void*)&bq);
pthread_join(a, NULL);
pthread_join(b, NULL);
return 0;
}
运行结果:
什么是信号量?
对于临界资源,我们最开始的认知是一次只能有一个线程访问,那有没有这种情况,临界资源很大,一个线程只访问其中的一部分,而另外一个线程访问临界资源的另一部分,两个线程互不干扰,这样可不可以实现多个线程同时访问一块临界资源呢?
答案当然是可以的。
信号量可以简单理解为一个计数器。记录当前临界资源还可以被多少个线程访问。
sem_t sem;
定义一个信号量。
初始化与销毁信号量:
sem_init();
sem_destroy();
sem_wait();
sem_post();
下面我们还是通过生产者与消费者模型来理解信号量。
基于信号量的生产者与消费者模型
我们这次使用vector来作为底层容器,因为vector支持下标访问,便于操作。
该模型由于使用信号量实现,所以生产者与消费者会同时访问临界资源,但是互不干扰。过程可理解为一个环形。
head放数据,tail拿数据。如果head速度慢,tail会追上head,此时应该tail停止(等待),head放过数据后,tail才可以拿数据。如果tail速度慢,head会追上tail,此时head应该停止(等待),tail拿过数据后,有空的地方,head才能继续放数据。
代码实现:
Ring.hpp
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <unistd.h>
#define NUM 5
class RingQueue{
public:
RingQueue(int cap = NUM) :_cap(cap), _consumerStep(0), _producerStep(0){
_v.resize(_cap);
sem_init(&_dataSem, 0, 0);
sem_init(&_spaceSem, 0, cap);
}
~RingQueue(){
sem_destroy(&_dataSem);
sem_destroy(&_spaceSem);
}
void push(int& data){
sem_wait(&_spaceSem);
_v[_producerStep++] = data;
std::cout << "I put a data, you can get it..." << std::endl;
_producerStep %= _cap;
sem_post(&_dataSem);
}
void pop(){
sem_wait(&_dataSem);
int data = _v[_consumerStep++];
_consumerStep %= _cap;
std::cout << "I get a data: " << data << " * 2 = " << data * 2 << std::endl;
sem_post(&_spaceSem);
}
private:
std::vector<int> _v;
int _cap;
sem_t _dataSem;
sem_t _spaceSem;
int _consumerStep;
int _producerStep;
};
Ring.cc
#include "RingQueue.hpp"
void* producer(void* arg){
RingQueue* rq = (RingQueue*)arg;
while(true){
int data = rand() % 100;
rq->push(data);
}
}
void* consumer(void* arg){
RingQueue* rq = (RingQueue*)arg;
while(true){
rq->pop();
sleep(1);
}
}
int main(){
pthread_t a, b;
RingQueue rq;
pthread_create(&a, NULL, consumer, (void*)&rq);
pthread_create(&b, NULL, producer, (void*)&rq);
pthread_join(a, NULL);
pthread_join(b, NULL);
}
运行结果:
基于信号量的生产者消费者模型的效率要大于基于条件变量和互斥锁的生产者消费者模型。因为在这个模型中,生产者和消费者可以同时访问临界资源。