这里用的是Linux的pthread线程库,需要加pthread线程库。
线程的创建
第一个参数是线程id的地址。第二个参数是线程属性,一般为NULL。第三个是要执行的函数。第四个是函数的参数,一般也为NULL
线程的等待,第一个参数是线程的id,第二个一般为NULL,表示不关心退出的状态。如果主线程不等待join的话,那main函数就直接退出了。
线程的非正常终止
第一个情况就是没有join函数的效果。主线程退出子线程就终止了,这就是和进程不一样的地方。
第二个就是如果子线程溢出(比如delete内存两次),整个进程终止。
这些都是说明线程健壮性不够进程好。子线程会影响整个进程。
怎么让线程正常终止?
线程可以简单的从线程函数中返回,返回值是线程的退出码。
子线程可以return 0,或者return (void*) 1返回。因为子线程要求返回的是void() 。 0代表的就是空。
子线程可以被同一进程的其他线程用**pthread_cancel(thid)取消
子线程可以调用pthread_exit(0或void 1)**取消。
那么return 0和pthread_exit(0)区别是:如果子线程主函数又调用其他函数,在其他函数用return 0只会终止其他函数,子线程不会终止;而pthread_exit(0)线程也会终止。
线程参数的传递
1.创建的多个线程并不知道哪个线程先运行;
2.由于1,导致全局变量不能作为子线程的参数。在多进程中,全局变量可以
使用不用传递参数。
所以应该用第四个参数来给线程传递参数。比如全局变量var,第四个参数是
&var,这样是不对的。应该直接传var的值,然后强制转换成(void *) var。
这种传递参数的方法有价值吗?有,具体应用场景看后面,
上面只能传一个参数,如何传多个参数?
传地址参数。但是要保证给每个线程传一个地址,不能给多个线程传一个地址。
注意在线程主函数中把申请的内存释放掉。
这也不是一个好办法,正确的是把多个参数放在结构体中,把结构体地址传进去就可以,
线程的退出状态
和传参数一样的。在join中第二个参数传递。具体的再学习一下?
线程资源的回收
回顾一下进程资源的回收:子进程退出向父进程发送sigchild信号。父进程不处理这个信号就会产生僵尸进程。
如何避免产生僵尸进程:1、程序中显示的调用signal(SIGCHLD, SIG_IGN)来忽略SIGCHLD信号,这样子进程结束后,由内核来wai和释放资源
2、 fork两次,第一次fork的子进程在fork完成后直接退出,这样第二次fork得到的子进程就没有爸爸了,它会自动被老祖宗init收养,init会负责释放它的资源,这样就不会有“僵尸”产生了
3、一般使用信号的方式来处理,在收到SIGCHLD信号的时候,在信号处理函数中调用wait操作来释放他们的资源。
线程分离
子线程退出时,没有释放全部资源。
线程同步
互斥锁:
先声明互斥锁:pthread_mutex_t mutex;
初始化: pthread_mutex_init(&mutex,NULL)
加锁 pthread_mutex_lock(&mutex); 会阻塞等待
临界区
解锁 pthread_mutex_unlock(&mutex);
释放锁:pthread_mutex_destroy(&mutex);
互斥锁加锁失败后,会从用户态陷入到内核态,让内核帮助我们切换线程,虽然简化了使用锁的难度,但是存在一定的性能开销成本。
性能开销成本:两次线程上下文切换的成本。
1、当线程加锁失败时,内核将线程的状态从【运行】切换到睡眠状态,然后把CPU切换给其他线程运行;
2、当锁被释放时,之前睡眠状态的线程会变成就绪状态,然后内核就会在合适的时间把CPU切换给该线程运行;
自旋锁:主要用于等待时间很短的场景。
自旋锁通过CPU提供的CAS,在用户态完成加锁和解锁操作,不会主动产生线程上下文切换,所以相比互斥锁来说,会快一些开销小一些。使用自旋锁的时候,当发生多线程竞争锁的情况,加锁失败的线程会忙等待,直到拿到锁。忙等待可以通过while循环实现,不过最好是使用CPU提供的PAUSE指令来实现。
声明锁:pthread_spinlock_t mutex;
初始化:int pthread_spin_init();
int pthread_spin_lock();
读写锁
允许更高的并发性。写锁只能加到不加锁的代码,读锁只能加到读锁上。
时候读远大于写的场景,不会阻塞并发读。在linux优先考虑读锁,有可能导致写入线程饿死的情况。
条件变量
与互斥锁一起使用。实现生产消费者模型。
用互斥锁+条件变量实现生产消费者模型
pthread_cond_wait(&cond,&mutex)
这个函数的步骤
1.把互斥锁解锁;2阻塞,等待条件(被唤醒);3、条件被触发+给互斥锁加锁。
声明一个结构体缓存队列的消息
struct message
{
int id;
char mes[1000];
}
vector vm;
声明并初始化条件变量和互斥锁
void in(int sig); 数据入队
void* out(void *arg); 数据出队
int main()
{
signal(15,in); 接收15的信号,调用生产者函数;
pthread_t thid1,thid2,thid3; 创建三个消费线程;
pthread_create(&thid1,NULL,out,NULL);
}
void in(int sig)
{
static int mesgid=1; //消息计数器
struct message m;
memset(&m,0,sizeof(message);
给缓存队列加锁;
生产数据; m.mesgid=mesgid++; vm.push_back(m);
给缓存队列解锁;
pthread_cond_broadcast(&cond); 发送条件信号,激活全部线程
}
void* out(void *arg)
{
struct message m; //用于存放出队的消息;
while(true)
{
给缓存队列加锁;
while(vm.size()==0)
{
pthread_cond_wait(&cond,&mutex);如果队列为空,释放锁,等待信号。while可以防止虚假唤醒。
}
从缓存队列取第一条记录,删除该记录
memcpy(&m,&vm[0],sizeof(struct message));
vm.erase(vm.begin());
给缓存队列解锁;
业务处理代码;
}
}
开发多线程的服务端程序
目的:实现多个客户端交换信息的简单聊天程序
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <pthread.h>
#define BUF_SIZE 100
#define MAX_COUNT 256
void* handleClient(void* arg);
void sendMsg(char* msg, int len);
void errorHandling(const char* msg);
int clientCount = 0;
int clientSocks[MAX_COUNT];
pthread_mutex_t mutex;
int main(int argc, char* argv[])
{
int servSock, clientSock;
struct sockaddr_in servAddr, clientAddr;
int clientAddrSize;
pthread_t threadID;
if(2 != argc)
{
printf("Usage: %s <port>\n", argv[0]);
exit(1);
}
pthread_mutex_init(&mutex, NULL);
servSock = socket(PF_INET, SOCK_STREAM, 0);
if(-1 == servSock)
errorHandling("socket() error!");
memset(&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(atoi(argv[1]));
if(-1 == bind(servSock, (struct sockaddr*)&servAddr, sizeof(servAddr)))
errorHandling("bind() error!");
if(-1 == listen(servSock, 5))
errorHandling("listen() error!");
while(1)
{
clientAddrSize = sizeof(clientAddr);
clientSock = accept(servSock, (struct sockaddr*)&clientAddr, &clientAddrSize);
if(-1 == clientSock)
errorHandling("accept() error!");
pthread_mutex_lock(&mutex);
clientSocks[clientCount++] = clientSock;
pthread_mutex_unlock(&mutex);
pthread_create(&threadID, NULL, handleClient, (void*)&clientSock);
pthread_detach(threadID); //线程结束后自动销毁内存
printf("Connected client IP:%s\n", inet_ntoa(clientAddr.sin_addr));
}
close(servSock);
return 0;
}
void* handleClient(void *arg)
{
int clientSock = *((int*)arg);
int strLen = 0;
int i;
char msg[BUF_SIZE];
while((strLen = read(clientSock, msg, sizeof(msg))) != 0)
sendMsg(msg, strLen);
pthread_mutex_lock(&mutex);
for(i = 0; i < clientCount; i++)
{
if(clientSock == clientSocks[i])
{
while(i++ < (clientCount - 1))
clientSocks[i] = clientSocks[i+1];//数组中删除客户端套接字
break;
}
}
clientCount--;
pthread_mutex_unlock(&mutex);
close(clientSock);
return NULL;
}
void sendMsg(char *msg, int len) //send to all
{
int i;
pthread_mutex_lock(&mutex);
for(i = 0; i < clientCount; i++)
write(clientSocks[i], msg, len);
pthread_mutex_unlock(&mutex);
}
void errorHandling(const char *msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}