博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程(一)线程基础,线程之间的共享协作
阅读量:4946 次
发布时间:2019-06-11

本文共 18317 字,大约阅读时间需要 61 分钟。

1.基础概念

1.1 什么是进程和线程

进程是程序运行资源分配的最小单位。

其中资源包括:CPU、内存空间、磁盘IO等,同一进程中的多条线程共享该进程中的全部系统资源,而进程和进程之间是相互独立的。进程可以分为系统进程和用户进程。用于完成操作系统的各种功能的进程就是系统进程,他们就是处于运行状态下的操作系统本身。用户进程就是由你所启动的进程。

线程是cpu调度的最小单位,必须依赖于进程存在。

线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的、能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

线程无处不在

任何一个程序都必须要创建线程,特别是Java不管任何程序都必须启动一个main函数的主线程; Java Web开发里面的定时任务、定时器、JSP和 Servlet、异步消息处理机制,远程访问接口RM等,任何一个监听事件, onclick的触发事件等都离不开线程和并发的知识。

1.2 CPU核心数和线程数的关系

多核心:指单芯片多处理器。

多线程:让同一个处理器上的多个线程同步执行并共享处理器的执行资源

核心数、线程数关系:目前主流CPU都是多核的。增加核心数目就是为了增加线程数,因为操作系统是通过线程来执行任务的,一般情况下它们是1:1对应关系,也就是说四核CPU一般拥有四个线程。但 Intel引入超线程技术后,使核心数与线程数形成1:2的关系

1.3 CPU时间片轮转机制

时间片轮转调度是一种最古老、最简单、最公平且使用最广的算法,又称RR调度。每个进程被分配一个时间段,称作它的时间片,即该进程允许运行的时间。

时间片设得太短会导致过多的进程切换,降低了CPU效率:而设得太长又可能引起对短的交互请求的响应变差。将时间片设为100ms通常是一个比较合理的折衷。

1.4 并行和并发

并发:指应用能够交替执行不同的任务,比如单CPU核心下执行多线程并非是同时执行多个任务,如果你开两个线程执行,就是在你几乎不可能察觉到的速度不断去切换这两个任务,已达到"同时执行效果",其实并不是的,只是计算机的速度太快,我们无法察觉到而已.

当谈论并发的时候一定要加个单位时间,也就是说单位时间内并发量是多少。离开了单位时间其实是没有意义的

并行:指应用能够同时执行不同的任务,例:吃饭的时候可以边吃饭边打电话,这两件事情可以同时执行

两者区别:一个是交替执行,一个是同时执行.

1.5 高并发编程的优点及注意事项

优点:

(1)充分利用CPU的资源

(2)加快响应用户的时间

(3)可以使你的代码模块化,异步化,简单化

注意事项:

(1)线程之间的安全性

在同一个进程里面的多线程是资源共享的,也就是都可以访问同一个内存地址当中的一个变量。例如:若每个线程中对全局变量、静态变量只有读操作,而无写操作,一般来说,这个全局变量是线程安全的:若有多个线程同时执行写操作,一般都需要考虑线程同步,否则就可能影响线程安全。

(2)线程之间的死锁

为了解决线程之间的安全性引入了Java的锁机制,而一不小心就会产生Java线程死锁的多线程问题,因为不同的线程都在等待那些根本不可能被释放的锁,从而导致所有的工作都无法完成。假设有两个线程,分别代表两个饥饿的人,他们必须共享刀叉并轮流吃饭。他们都需要获得两个锁:共享刀和共享叉的锁。

假如线程A获得了刀,而线程B获得了叉。线程A就会进入阻塞状态来等待获得叉,而线程B则阻塞来等待线程A所拥有的刀。这只是人为设计的例子,但尽管在运行时很难探测到,这类情况却时常发生

(3)线程太多了会将服务器资源耗尽形成死机当机

线程数太多有可能造成系统创建大量线程而导致消耗完系统内存以及CPU的“过渡切换”,造成系统的死机。

OS系统对线程的数量是有限制的:Linux下默认线程数量不超过1000,而windows下默认线程数量不超过2000.当创建一个新的线程时,会为其分配一个栈空间(约1M大小),以及对应的文件描述符(句柄->指针)

某些系统资源是有限的,如文件描述符。多线程程序可能耗尽资源,因为每个线程都可能希望有一个这样的资源。如果线程数相当大,或者某个资源的侯选线程数远远超过了可用的资源数则最好使用资源池。一个最好的示例是数据库连接池。只要线程需要使用一个数据库连接,它就从池中取出一个,使用以后再将它返回池中。资源池也称为资源库。

参考:http://enjoy.ke.qq.com

2.Java线程

2.1 线程的启动与终止

线程的启动

Java中线程启动的方式有两种

1、X extends Thread;,然后X.run

2、X implements  Runnable;然后交给Thread运行

参考:java.lang.Thread[1.8]

There are two ways to create a new thread of execution. One is to * declare a class to be a subclass of Thread. This * subclass should override the run method of class * Thread. An instance of the subclass can then be * allocated and started. For example, a thread that computes primes * larger than a stated value could be written as follows: * 

 *     class PrimeThread extends Thread { *         long minPrime; *         PrimeThread(long minPrime) { *             this.minPrime = minPrime; *         } * *         public void run() { *             // compute primes larger than minPrime *              . . . *         } *     } * 

*

* The following code would then create a thread and start it running: *

 *     PrimeThread p = new PrimeThread(143); *     p.start(); * 
*

* The other way to create a thread is to declare a class that * implements the Runnable interface. That class then * implements the run method. An instance of the class can * then be allocated, passed as an argument when creating * Thread, and started. The same example in this other * style looks like the following: *


 *     class PrimeRun implements Runnable { *         long minPrime; *         PrimeRun(long minPrime) { *             this.minPrime = minPrime; *         } * *         public void run() { *             // compute primes larger than minPrime *              . . . *         } *     } * 

*

* The following code would then create a thread and start it running: *

 *     PrimeRun p = new PrimeRun(143); *     new Thread(p).start(); * 
*

Thread是Java里对线程的唯一抽象,Runnable只是对任务(业务逻辑)的抽象。Thread可以接受任意一个Runnable的实例并执行。

线程的终止:

要么是run执行完成了,要么是抛出了一个未处理的异常导致线程提前结束。

stop

暂停、恢复和停止操作对应在线程Thread的API就是suspend()resume()stop()。但是这些API是过期的,也就是不建议使用的。不建议使用的原因主要有:以suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。同样,stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。正因为suspend()、resume()和stop()方法带来的副作用,这些方法才被标注为不建议使用的过期方法。

中断

安全的中止则是其他线程通过调用某个线程A的interrupt()方法对其进行中断操作, 中断好比其他线程对该线程打了个招呼,“A,你要中断了”,不代表线程A会立即停止自己的工作,同样的A线程完全可以不理会这种中断请求。因为java里的线程是协作式的,不是抢占式的。线程通过检查自身的中断标志位是否被置为true来进行响应,

线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()来进行判断当前线程是否被中断,不过Thread.interrupted()会同时将中断标识位改写为false。

如果一个线程处于了阻塞状态(如线程调用了thread.sleep、thread.join、thread.wait等),则在线程在检查中断标示时如果发现中断标示为true,则会在这些阻塞方法调用处抛出InterruptedException异常,并且在抛出异常后会立即将线程的中断标示位清除,即重新设置为false。

不建议自定义一个取消标志位来中止线程的运行。因为run方法里有阻塞调用时会无法很快检测到取消标志,线程必须从阻塞调用返回后,才会检查这个取消标志。这种情况下,使用中断会更好,因为,

一、一般的阻塞方法,如sleep等本身就支持中断的检查,

二、检查中断位的状态和检查取消标志位没什么区别,用中断位的状态还可以避免声明取消标志位,减少资源的消耗。

注意:处于死锁状态的线程无法被中断

2.2 run()和start()

Thread是java对线程概念进行的抽象。new Thread() 只是创建一个Thread的实例,并没有和系统中真正的线程联系到一起。只有在执行了start()之后,才实现了真正意义上的线程启动。

public synchronized void start() {        /**         * This method is not invoked for the main method thread or "system"         * group threads created/set up by the VM. Any new functionality added         * to this method in the future may have to also be added to the VM.         *         * A zero status value corresponds to state "NEW".         */        if (threadStatus != 0)            throw new IllegalThreadStateException();        /* Notify the group that this thread is about to be started         * so that it can be added to the group's list of threads         * and the group's unstarted count can be decremented. */        group.add(this);        boolean started = false;        try {            start0();            started = true;        } finally {            try {                if (!started) {                    group.threadStartFailed(this);                }            } catch (Throwable ignore) {                /* do nothing. If start0 threw a Throwable then                  it will be passed up the call stack */            }        }    }private native void start0();

start()方法让一个线程进入就绪队列等待分配cpu,分到cpu后才调用实现的run()方法,start()方法不能重复调用,如果重复调用会抛出异常。

run()方法是业务逻辑实现的地方,本质上和任意一个类的任意一个成员方法并没有任何区别,可以重复执行,也可以被单独调用。

2.3 线程的状态和其他常用方法

线程状态

线程在新建完成执行start()后进入的是就绪状态,此时在等待时间片的分配。

yield()方法:使当前线程让出CPU占有权,但让出的时间是不可设定的。也不会释放锁资源。注意:因为又不是每个线程都需要这个锁的,而且执行yield( )的线程不一定就会持有锁。

所有执行yield()的线程有可能在进入到就绪状态后会被操作系统再次选中马上又被执行。

join方法

把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行。比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B。

public class UseJoin {    private static class JoinThread implements Runnable {        @Override        public void run() {            System.out.println(Thread.currentThread().getName() + "is start!");            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println(Thread.currentThread().getName() + "is end!");        }    }    public static void main(String[] args) throws InterruptedException {        Thread main = Thread.currentThread();        Thread joinThread = new Thread(new JoinThread());        joinThread.setName("Join Thread");        joinThread.start();        joinThread.join();        System.out.println("main start...");        Thread.sleep(2000);        System.out.println("main end ...");    }}

打开

joinThread.join(); 结果如下:

关闭

joinThread.join(); 结果如下:

线程的优先级

在Java线程中,通过一个整型成员变量priority来控制优先级,优先级的范围从1~10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分配时间片的数量要多于优先级低的线程。

设置线程优先级时,针对频繁阻塞(休眠或者I/O操作)的线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。在不同的JVM以及操作系统上,线程规划会存在差异,有些操作系统甚至会忽略对线程优先级的设定。

守护线程

Daemon(守护)线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个Java虚拟机中不存在Daemon线程的时候,Java虚拟机将会退出。可以通过调用Thread.setDaemon(true)将线程设置为Daemon线程。我们一般用不上,比如垃圾回收线程就是Daemon线程。(进程停止,守护线程也将结束)

Daemon线程被用作完成支持性工作,但是在Java虚拟机退出时Daemon线程中的finally块并不一定会执行。在构建Daemon线程时,不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑。

3.线程间的共享协作

线程间的共享

3.1 synchronized 内置锁

线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,包括数据之间的共享,协同处理事情。这将会带来巨大的价值。

Java支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。

对象锁和类锁:

对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的class对象上的。我们知道,类的对象实例可以有很多个,但是每个类只有一个class对象,所以不同对象实例的对象锁是互不干扰的,但是每个类只有一个类锁。

但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的class对象。类锁和对象锁之间也是互不干扰的。

(①.用于方法上->同步方法->隐式的使用 this->同步方法 ②.使用对象用于代码块上->同步块->对象锁 ③.使用在代码块上 显示的使用this ->对象锁 ->当前的类对象)

类锁本质上也是对象锁,对象为jvm中的唯一的class对象

不同的线程锁 锁不同的对象时,线程是可以并行执行的;锁相同对象时 会产生竞争谁抢到 对象(锁)谁就先执行。

3.2 Volatile 最轻量的同步机制:

volatile保证了不同线程对这个变量进行操作时的可见性,一个线程修改了某个变量的值,这个 新值对其他线程而言 也是立即可见的。

只能保证可见性不能保证一致性

适用场景:一写多度 

3.3 ThreadLocal辨析

与synchonized的比较

都用于解决多线程并发访问。

synchionized是利用锁机制,使变量或者代码块 在某一时刻仅仅只能被一个线程访问。

ThrealLocal则是 为每个线程提供一个变量的副本,使得每个线程在某一时间访问到的并非同一个对象,从而隔离了多个线程对数据的共享。

 图片来源:http://enjoy.ke.qq.com

 源码解析:

 
/**  * Returns the value in the current thread's copy of this  * thread-local variable.  If the variable has no value for the  * current thread, it is first initialized to the value returned  * by an invocation of the {@link #initialValue} method.  *  * @return the current thread's value of this thread-local//拿到当前线程独有的threadLocalMap  */ public T get() {
Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) {
@SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
public void set(T value) {
//首先获取当前线程 Thread t = Thread.currentThread(); //以线程为key 获取到一个ThrealLocalMap ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } //ThreadLocalMap是ThreadLocal的静态内部类
ThreadLocalMap getMap(Thread t) {
return t.threadLocals; } //Thread类中有一个ThreadLocalMap成员变量,所以getMap(Thread t)是直接返回线程的该成员
ThreadLocal.ThreadLocalMap threadLocals = null;
/**  * ThreadLocalMap is a customized hash map suitable only for  * maintaining thread local values. No operations are exported  * outside of the ThreadLocal class. The class is package private to  * allow declaration of fields in class Thread.  To help deal with  * very large and long-lived usages, the hash table entries use  * WeakReferences for keys. However, since reference queues are not  * used, stale entries are guaranteed to be removed only when  * the table starts running out of space.  */ static class ThreadLocalMap {
/** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference
> { //继承自WeakReference (弱引用)有gc发生时,就会立即清除 /** The value associated with this ThreadLocal. */ Object value; //类似于map的key value结构,key就是ThreadLocal value就是需要隔离的访问变量 Entry(ThreadLocal
k, Object v) {
super(k); value = v; } } /** * The initial capacity -- MUST be a power of two. */ private static final int INITIAL_CAPACITY = 16; /** * The table, resized as necessary. * table.length MUST always be a power of two.//长度需要永远是2的幂 */ private Entry[] table;//使用Entry数组 ,应为一个ThreadLocal可能会对应着多个变量需要进行隔离
/**  * Get the entry associated with key.  This method  * itself handles only the fast path: a direct hit of existing  * key. It otherwise relays to getEntryAfterMiss.  This is  * designed to maximize performance for direct hits, in part  * by making this method readily inlinable.  *  * @param  key the thread local object  * @return the entry associated with key, or null if no such  */ private Entry getEntry(ThreadLocal
key) {
int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); }
/**  * Set the value associated with key.  *  * @param key the thread local object  * @param value the value to be set  */ private void set(ThreadLocal
key, Object value) {
// We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal
k = e.get(); if (k == key) {
e.value = value; return; } if (k == null) {
replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } /** * Remove the entry for key. */ private void remove(ThreadLocal
key) {
Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear(); expungeStaleEntry(i); return; } } }
/**  * Replace a stale entry encountered during a set operation  * with an entry for the specified key.  The value passed in  * the value parameter is stored in the entry, whether or not  * an entry already exists for the specified key.  *  * As a side effect, this method expunges all stale entries in the  * "run" containing the stale entry.  (A run is a sequence of entries  * between two null slots.)  *  * @param  key the key  * @param  value the value to be associated with key  * @param  staleSlot index of the first stale entry encountered while  *         searching for key.  */ private void replaceStaleEntry(ThreadLocal
key, Object value, int staleSlot) {
Entry[] tab = table; int len = tab.length; Entry e; // Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). int slotToExpunge = staleSlot; for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; // Find either the key or trailing null slot of run, whichever // occurs first for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal
k = e.get(); // If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. if (k == key) {
e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e; // Start expunge at preceding stale entry if it exists if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // If key not found, put new entry in stale slot tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); // If there are any other stale entries in run, expunge them if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }

ThreadLocal引发内存泄漏

实际每个线程都维护一个ThreadLocalMap,由于ThreadLocalMap中使用ThreadLocal弱引用作为key ,当垃圾收集器工作时,无论当前内存是够足够,都会回收掉只被弱引用管理的对象实例。

当GC发生时,threadLocal就被回收变为null,这样ThreadLocalMap中就出现key为null的Entry,这些Entry中的value无法被访问(key->null 没有对null的引用),如果当前线程迟迟不结束的话,这些value就会一直存在一条强引用链:

Thread Ref->Thread->ThreadLocalMap->Entry->value 而这块value永远不会被访问到 所以存在内存泄漏。

只有当前thread结束以后,current thread就不会存在栈中,强引用断开,Current Thread、Map value将全部被GC回收。最好的做法是不在需要使用ThreadLocal变量后,都调用它的remove()方法,清除数据。

总结:

  JVM利用设置ThreadLocalMap的key为弱引用,来避免内存泄漏

  JVM利用remove、get、set方法的时候,回收弱引用 

  当ThreadLocal存储很多Key为null的Entry的时候,而不再去调用remove、get、set方法,那么将导致内存泄漏。

  使用线程池+ ThreadLocal时要小心,因为这种情况下,线程是一直在不断的重复运行的,从而也就造成了value可能造成累积的情况

ThreadLocal错误使用:

threadLocalMap中保存的如果是同一个对象的引用时 不同线程修改是会修改同一份实例,导致无法实现隔离,如:

public static Number number = new Number(0);public static ThreadLocal
value = new ThreadLocal
() { };

参考:http://enjoy.ke.qq.com

线程间的协作:

线程之间相互配合,完成工作

等待/通知机制

指一个线程A调用对象O的wait()方法进入等待状态,当另一个线程B调用对象O的nofity()或者notifyAll()方法后,线程A就收到通知 从对象O的wait()方法返回,进而执行后续操作。

A和B线程需要通过共同的对象O来完成交互,对象上的wait()和notify/notifyAll()的关系就像开关信号一样,用来完成等待方和通知方之间的相互工作。

notify():

通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是 该线程获取到了对象的锁,如果没有获得到锁 线程重新进入到waiting状态

notifyAll():

通知所有等待在该对象上的线程

wait():

调用该方法的线程进入waiting状态,只有等待另外的线程通知或者被中断才会返回,当调用wati()方法后,会释放对象的锁

wait(long)

超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回

wait (long,int)

对于超时时间更细粒度的控制,可以达到纳秒

等待方遵循如下原则:

1).获取对象的锁

2) 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件

3)条件满足,执行对应的逻辑

synchronized (obj){            while (条件不满足){                obj.wait();            }            //对应的逻辑处理            doSomething();        }

通知方遵循原则:

1)获得对象的锁

2)改变条件

3)通知所有等待的对象上的线程

public  synchronized  void changeSite(){        this.site = "BeiJing";//改变条件        notifyAll();//通知    }

 在调用wait(),notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法,notify()系列方法。

 

yield() 、sleep()、wait()、notify()等方法对锁的影响:

1)yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。

2)调用wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行wait方法后面的代码。

3)调用notify()系列方法后,对锁无影响,线程只有在syn同步代码执行完后才会自然而然的释放锁,所以notify()系列方法一般都是放在syn同步代码的最后一行。

 

转载于:https://www.cnblogs.com/cangshublogs/p/10721862.html

你可能感兴趣的文章