超强、超详细Redis数据库入门教程

【redis是什么】

redis是一个开源的、使用C语言编写的、支持网络交互的、可基于内存也可持久化的Key-Value数据库。

redis的官网地址,非常好记,是redis.io。(特意查了一下,域名后缀io属于国家域名,是british Indian Ocean territory,即英属印度洋领地)

目前,Vmware在资助着redis项目的开发和维护。

【redis的作者何许人也】

开门见山,先看照片:

是不是出乎了你的意料,嗯,高手总会有些地方与众不同的。

这位便是redis的作者,他叫Salvatore Sanfilippo,来自意大利的西西里岛,现在居住在卡塔尼亚。目前供职于Pivotal公司。

他使用的网名是antirez,如果你有兴趣,可以去他的博客逛逛,地址是antirez.com,当然也可以去follow他的github,地址是http://github.com/antirez。

【谁在使用redis】

Blizzard、digg、stackoverflow、github、flickr …

【学会安装redis】

从redis.io下载最新版redis-X.Y.Z.tar.gz后解压,然后进入redis-X.Y.Z文件夹后直接make即可,安装非常简单。

make成功后会在src文件夹下产生一些二进制可执行文件,包括redis-server、redis-cli等等:

$ find . -type f -executable
./redis-benchmark //用于进行redis性能测试的工具
./redis-check-dump //用于修复出问题的dump.rdb文件
./redis-cli //redis的客户端
./redis-server //redis的服务端
./redis-check-aof //用于修复出问题的AOF文件
./redis-sentinel //用于集群管理

【学会启动redis】

启动redis非常简单,直接./redis-server就可以启动服务端了,还可以用下面的方法指定要加载的配置文件:

./redis-server ../redis.conf

默认情况下,redis-server会以非daemon的方式来运行,且默认服务端口为6379。

有关作者为什么选择6379作为默认端口,还有一段有趣的典故,英语好的同学可以看看作者这篇博文中的解释。

【使用redis客户端】

我们直接看一个例子:

//这样来启动redis客户端了
$ ./redis-cli
//用set指令来设置key、value
127.0.0.1:6379> set name “roc”
OK
//来获取name的值
127.0.0.1:6379> get name
“roc”
//通过客户端来关闭redis服务端
127.0.0.1:6379> shutdown
127.0.0.1:6379>

【redis数据结构 – 简介】

redis是一种高级的key:value存储系统,其中value支持五种数据类型:

1.字符串(strings)

2.字符串列表(lists)

3.字符串集合(sets)

4.有序字符串集合(sorted sets)

5.哈希(hashes)

而关于key,有几个点要提醒大家:

1.key不要太长,尽量不要超过1024字节,这不仅消耗内存,而且会降低查找的效率;

2.key也不要太短,太短的话,key的可读性会降低;

3.在一个项目中,key最好使用统一的命名模式,例如user:10000:passwd。

【redis数据结构 – strings】

有人说,如果只使用redis中的字符串类型,且不使用redis的持久化功能,那么,redis就和memcache非常非常的像了。这说明strings类型是一个很基础的数据类型,也是任何存储系统都必备的数据类型。

我们来看一个最简单的例子:

set mystr “hello world!” //设置字符串类型
get mystr //读取字符串类型

字符串类型的用法就是这么简单,因为是二进制安全的,所以你完全可以把一个图片文件的内容作为字符串来存储。

另外,我们还可以通过字符串类型进行数值操作:

127.0.0.1:6379> set mynum “2”
OK
127.0.0.1:6379> get mynum
“2”
127.0.0.1:6379> incr mynum
(integer) 3
127.0.0.1:6379> get mynum
“3”

看,在遇到数值操作时,redis会将字符串类型转换成数值。

由于INCR等指令本身就具有原子操作的特性,所以我们完全可以利用redis的INCR、INCRBY、DECR、DECRBY等指令来实现原子计数的效果,假如,在某种场景下有3个客户端同时读取了mynum的值(值为2),然后对其同时进行了加1的操作,那么,最后mynum的值一定是5。不少网站都利用redis的这个特性来实现业务上的统计计数需求。

【redis数据结构 – lists】

redis的另一个重要的数据结构叫做lists,翻译成中文叫做“列表”。

首先要明确一点,redis中的lists在底层实现上并不是数组,而是链表,也就是说对于一个具有上百万个元素的lists来说,在头部和尾部插入一个新元素,其时间复杂度是常数级别的,比如用LPUSH在10个元素的lists头部插入新元素,和在上千万元素的lists头部插入新元素的速度应该是相同的。

虽然lists有这样的优势,但同样有其弊端,那就是,链表型lists的元素定位会比较慢,而数组型lists的元素定位就会快得多。

lists的常用操作包括LPUSH、RPUSH、LRANGE等。我们可以用LPUSH在lists的左侧插入一个新元素,用RPUSH在lists的右侧插入一个新元素,用LRANGE命令从lists中指

//新建一个list叫做mylist,并在列表头部插入元素”1″
127.0.0.1:6379> lpush mylist “1”
//返回当前mylist中的元素个数
(integer) 1
//在mylist右侧插入元素”2″
127.0.0.1:6379> rpush mylist “2”
(integer) 2
//在mylist左侧插入元素”0″
127.0.0.1:6379> lpush mylist “0”
(integer) 3
//列出mylist中从编号0到编号1的元素
127.0.0.1:6379> lrange mylist 0 1
1) “0”
2) “1”
//列出mylist中从编号0到倒数第一个元素
127.0.0.1:6379> lrange mylist 0 -1
1) “0”
2) “1”
3) “2”

定一个范围来提取元素。我们来看几个例子:

lists的应用相当广泛,随便举几个例子:

1.我们可以利用lists来实现一个消息队列,而且可以确保先后顺序,不必像MySQL那样还需要通过ORDER BY来进行排序。

2.利用LRANGE还可以很方便的实现分页的功能。

3.在博客系统中,每片博文的评论也可以存入一个单独的list中。

【redis数据结构 – 集合】

redis的集合,是一种无序的集合,集合中的元素没有先后顺序。

集合相关的操作也很丰富,如添加新元素、删除已有元素、取交集、取并集、取差集等。我们来看例子:

//向集合myset中加入一个新元素”one”
127.0.0.1:6379> sadd myset “one”
(integer) 1
127.0.0.1:6379> sadd myset “two”
(integer) 1
//列出集合myset中的所有元素
127.0.0.1:6379> smembers myset
1) “one”
2) “two”
//判断元素1是否在集合myset中,返回1表示存在
127.0.0.1:6379> sismember myset “one”
(integer) 1
//判断元素3是否在集合myset中,返回0表示不存在
127.0.0.1:6379> sismember myset “three”
(integer) 0
//新建一个新的集合yourset
127.0.0.1:6379> sadd yourset “1”
(integer) 1
127.0.0.1:6379> sadd yourset “2”
(integer) 1
127.0.0.1:6379> smembers yourset
1) “1”
2) “2”
//对两个集合求并集
127.0.0.1:6379> sunion myset yourset
1) “1”
2) “one”
3) “2”
4) “two”

对于集合的使用,也有一些常见的方式,比如,QQ有一个社交功能叫做“好友标签”,大家可以给你的好友贴标签,比如“大美女”、“土豪”、“欧巴”等等,这时就可以使用redis的集合来实现,把每一个用户的标签都存储在一个集合之中。

【redis数据结构 – 有序集合】

redis不但提供了无需集合(sets),还很体贴的提供了有序集合(sorted sets)。有序集合中的每个元素都关联一个序号(score),这便是排序的依据。

很多时候,我们都将redis中的有序集合叫做zsets,这是因为在redis中,有序集合相关的操作指令都是以z开头的,比如zrange、zadd、zrevrange、zrangebyscore等等

老规矩,我们来看几个生动的例子:

//新增一个有序集合myzset,并加入一个元素baidu.com,给它赋予的序号是1:

127.0.0.1:6379> zadd myzset 1 baidu.com
(integer) 1
//向myzset中新增一个元素360.com,赋予它的序号是3
127.0.0.1:6379> zadd myzset 3 360.com
(integer) 1
//向myzset中新增一个元素google.com,赋予它的序号是2
127.0.0.1:6379> zadd myzset 2 google.com
(integer) 1
//列出myzset的所有元素,同时列出其序号,可以看出myzset已经是有序的了。
127.0.0.1:6379> zrange myzset 0 -1 with scores
1) “baidu.com”
2) “1”
3) “google.com”
4) “2”
5) “360.com”
6) “3”
//只列出myzset的元素
127.0.0.1:6379> zrange myzset 0 -1
1) “baidu.com”
2) “google.com”
3) “360.com”

【redis数据结构 – 哈希】

最后要给大家介绍的是hashes,即哈希。哈希是从redis-2.0.0版本之后才有的数据结构。

hashes存的是字符串和字符串值之间的映射,比如一个用户要存储其全名、姓氏、年龄等等,就很适合使用哈希。

我们来看一个例子:

//建立哈希,并赋值
127.0.0.1:6379> HMSET user:001 username antirez password P1pp0 age 34
OK
//列出哈希的内容
127.0.0.1:6379> HGETALL user:001
1) “username”
2) “antirez”
3) “password”
4) “P1pp0”
5) “age”
6) “34”
//更改哈希中的某一个值
127.0.0.1:6379> HSET user:001 password 12345
(integer) 0
//再次列出哈希的内容
127.0.0.1:6379> HGETALL user:001
1) “username”
2) “antirez”
3) “password”
4) “12345”
5) “age”
6) “34”

有关hashes的操作,同样很丰富,需要时,大家可以从这里查询。

【聊聊redis持久化 – 两种方式】

redis提供了两种持久化的方式,分别是RDB(Redis DataBase)和AOF(Append Only File)。

RDB,简而言之,就是在不同的时间点,将redis存储的数据生成快照并存储到磁盘等介质上;

AOF,则是换了一个角度来实现持久化,那就是将redis执行过的所有写指令记录下来,在下次redis重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了。

其实RDB和AOF两种方式也可以同时使用,在这种情况下,如果redis重启的话,则会优先采用AOF方式来进行数据恢复,这是因为AOF方式的数据恢复完整度更高。

如果你没有数据持久化的需求,也完全可以关闭RDB和AOF方式,这样的话,redis将变成一个纯内存数据库,就像memcache一样。

【聊聊redis持久化 – RDB】

RDB方式,是将redis某一时刻的数据持久化到磁盘中,是一种快照式的持久化方法。

redis在进行数据持久化的过程中,会先将数据写入到一个临时文件中,待持久化过程都结束了,才会用这个临时文件替换上次持久化好的文件。正是这种特性,让我们可以随时来进行备份,因为快照文件总是完整可用的。

对于RDB方式,redis会单独创建(fork)一个子进程来进行持久化,而主进程是不会进行任何IO操作的,这样就确保了redis极高的性能。

如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。

虽然RDB有不少优点,但它的缺点也是不容忽视的。如果你对数据的完整性非常敏感,那么RDB方式就不太适合你,因为即使你每5分钟都持久化一次,当redis故障时,仍然会有近5分钟的数据丢失。所以,redis还提供了另一种持久化方式,那就是AOF。

【聊聊redis持久化 – AOF】

AOF,英文是Append Only File,即只允许追加不允许改写的文件。

如前面介绍的,AOF方式是将执行过的写指令记录下来,在数据恢复时按照从前到后的顺序再将指令都执行一遍,就这么简单。

我们通过配置redis.conf中的appendonly yes就可以打开AOF功能。如果有写操作(如SET等),redis就会被追加到AOF文件的末尾。

默认的AOF持久化策略是每秒钟fsync一次(fsync是指把缓存中的写指令记录到磁盘中),因为在这种情况下,redis仍然可以保持很好的处理性能,即使redis故障,也只会丢失最近1秒钟的数据。

如果在追加日志时,恰好遇到磁盘空间满、inode满或断电等情况导致日志写入不完整,也没有关系,redis提供了redis-check-aof工具,可以用来进行日志修复。

因为采用了追加方式,如果不做任何处理的话,AOF文件会变得越来越大,为此,redis提供了AOF文件重写(rewrite)机制,即当AOF文件的大小超过所设定的阈值时,redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集。举个例子或许更形象,假如我们调用了100次INCR指令,在AOF文件中就要存储100条指令,但这明显是很低效的,完全可以把这100条指令合并成一条SET指令,这就是重写机制的原理。

在进行AOF重写时,仍然是采用先写临时文件,全部完成后再替换的流程,所以断电、磁盘满等问题都不会影响AOF文件的可用性,这点大家可以放心。

AOF方式的另一个好处,我们通过一个“场景再现”来说明。某同学在操作redis时,不小心执行了FLUSHALL,导致redis内存中的数据全部被清空了,这是很悲剧的事情。不过这也不是世界末日,只要redis配置了AOF持久化方式,且AOF文件还没有被重写(rewrite),我们就可以用最快的速度暂停redis并编辑AOF文件,将最后一行的FLUSHALL命令删除,然后重启redis,就可以恢复redis的所有数据到FLUSHALL之前的状态了。是不是很神奇,这就是AOF持久化方式的好处之一。但是如果AOF文件已经被重写了,那就无法通过这种方法来恢复数据了。

虽然优点多多,但AOF方式也同样存在缺陷,比如在同样数据规模的情况下,AOF文件要比RDB文件的体积大。而且,AOF方式的恢复速度也要慢于RDB方式。

如果你直接执行BGREWRITEAOF命令,那么redis会生成一个全新的AOF文件,其中便包括了可以恢复现有数据的最少的命令集。

如果运气比较差,AOF文件出现了被写坏的情况,也不必过分担忧,redis并不会贸然加载这个有问题的AOF文件,而是报错退出。这时可以通过以下步骤来修复出错的文件:

1.备份被写坏的AOF文件

2.运行redis-check-aof –fix进行修复

3.用diff -u来看下两个文件的差异,确认问题点

4.重启redis,加载修复后的AOF文件

【聊聊redis持久化 – AOF重写】

AOF重写的内部运行原理,我们有必要了解一下。

在重写即将开始之际,redis会创建(fork)一个“重写子进程”,这个子进程会首先读取现有的AOF文件,并将其包含的指令进行分析压缩并写入到一个临时文件中。

与此同时,主工作进程会将新接收到的写指令一边累积到内存缓冲区中,一边继续写入到原有的AOF文件中,这样做是保证原有的AOF文件的可用性,避免在重写过程中出现意外。

当“重写子进程”完成重写工作后,它会给父进程发一个信号,父进程收到信号后就会将内存中缓存的写指令追加到新AOF文件中。

当追加结束后,redis就会用新AOF文件来代替旧AOF文件,之后再有新的写指令,就都会追加到新的AOF文件中了。

【聊聊redis持久化 – 如何选择RDB和AOF】

对于我们应该选择RDB还是AOF,官方的建议是两个同时使用。这样可以提供更可靠的持久化方案。

【聊聊主从 – 用法】

像MySQL一样,redis是支持主从同步的,而且也支持一主多从以及多级从结构。

主从结构,一是为了纯粹的冗余备份,二是为了提升读性能,比如很消耗性能的SORT就可以由从服务器来承担。

redis的主从同步是异步进行的,这意味着主从同步不会影响主逻辑,也不会降低redis的处理性能。

主从架构中,可以考虑关闭主服务器的数据持久化功能,只让从服务器进行持久化,这样可以提高主服务器的处理性能。

在主从架构中,从服务器通常被设置为只读模式,这样可以避免从服务器的数据被误修改。但是从服务器仍然可以接受CONFIG等指令,所以还是不应该将从服务器直接暴露到不安全的网络环境中。如果必须如此,那可以考虑给重要指令进行重命名,来避免命令被外人误执行。

【聊聊主从 – 同步原理】

从服务器会向主服务器发出SYNC指令,当主服务器接到此命令后,就会调用BGSAVE指令来创建一个子进程专门进行数据持久化工作,也就是将主服务器的数据写入RDB文件中。在数据持久化期间,主服务器将执行的写指令都缓存在内存中。

在BGSAVE指令执行完成后,主服务器会将持久化好的RDB文件发送给从服务器,从服务器接到此文件后会将其存储到磁盘上,然后再将其读取到内存中。这个动作完成后,主服务器会将这段时间缓存的写指令再以redis协议的格式发送给从服务器。

另外,要说的一点是,即使有多个从服务器同时发来SYNC指令,主服务器也只会执行一次BGSAVE,然后把持久化好的RDB文件发给多个下游。在redis2.8版本之前,如果从服务器与主服务器因某些原因断开连接的话,都会进行一次主从之间的全量的数据同步;而在2.8版本之后,redis支持了效率更高的增量同步策略,这大大降低了连接断开的恢复成本。

主服务器会在内存中维护一个缓冲区,缓冲区中存储着将要发给从服务器的内容。从服务器在与主服务器出现网络瞬断之后,从服务器会尝试再次与主服务器连接,一旦连接成功,从服务器就会把“希望同步的主服务器ID”和“希望请求的数据的偏移位置(replication offset)”发送出去。主服务器接收到这样的同步请求后,首先会验证主服务器ID是否和自己的ID匹配,其次会检查“请求的偏移位置”是否存在于自己的缓冲区中,如果两者都满足的话,主服务器就会向从服务器发送增量内容。

增量同步功能,需要服务器端支持全新的PSYNC指令。这个指令,只有在redis-2.8之后才具有。

【聊聊redis的事务处理】

众所周知,事务是指“一个完整的动作,要么全部执行,要么什么也没有做”。

在聊redis事务处理之前,要先和大家介绍四个redis指令,即MULTI、EXEC、DISCARD、WATCH。这四个指令构成了redis事务处理的基础。

1.MULTI用来组装一个事务;

2.EXEC用来执行一个事务;

3.DISCARD用来取消一个事务;

4.WATCH用来监视一些key,一旦这些key在事务执行之前被改变,则取消事务的执行。

纸上得来终觉浅,我们来看一个MULTI和EXEC的例子:

redis> MULTI //标记事务开始
OK
redis> INCR user_id //多条命令按顺序入队
QUEUED
redis> INCR user_id
QUEUED
redis> INCR user_id
QUEUED
redis> PING
QUEUED
redis> EXEC //执行
1) (integer) 1
2) (integer) 2
3) (integer) 3
4) PONG

在上面的例子中,我们看到了QUEUED的字样,这表示我们在用MULTI组装事务时,每一个命令都会进入到内存队列中缓存起来,如果出现QUEUED则表示我们这个命令成功插入了缓存队列,在将来执行EXEC时,这些被QUEUED的命令都会被组装成一个事务来执行。

对于事务的执行来说,如果redis开启了AOF持久化的话,那么一旦事务被成功执行,事务中的命令就会通过write命令一次性写到磁盘中去,如果在向磁盘中写的过程中恰好出现断电、硬件故障等问题,那么就可能出现只有部分命令进行了AOF持久化,这时AOF文件就会出现不完整的情况,这时,我们可以使用redis-check-aof工具来修复这一问题,这个工具会将AOF文件中不完整的信息移除,确保AOF文件完整可用。

有关事务,大家经常会遇到的是两类错误:

1.调用EXEC之前的错误

2.调用EXEC之后的错误

“调用EXEC之前的错误”,有可能是由于语法有误导致的,也可能时由于内存不足导致的。只要出现某个命令无法成功写入缓冲队列的情况,redis都会进行记录,在客户端调用EXEC时,redis会拒绝执行这一事务。(这时2.6.5版本之后的策略。在2.6.5之前的版本中,redis会忽略那些入队失败的命令,只执行那些入队成功的命令)。我们来看一个这样的例子:

127.0.0.1:6379> multi
OK
127.0.0.1:6379> haha //一个明显错误的指令
(error) ERR unknown command ‘haha’
127.0.0.1:6379> ping
QUEUED
127.0.0.1:6379> exec
//redis无情的拒绝了事务的执行,原因是“之前出现了错误”
(error) EXECABORT Transaction discarded because of previous errors.

而对于“调用EXEC之后的错误”,redis则采取了完全不同的策略,即redis不会理睬这些错误,而是继续向下执行事务中的其他命令。这是因为,对于应用层面的错误,并不是redis自身需要考虑和处理的问题,所以一个事务中如果某一条命令执行失败,并不会影响接下来的其他命令的执行。我们也来看一个例子:

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set age 23
QUEUED
//age不是集合,所以如下是一条明显错误的指令
127.0.0.1:6379> sadd age 15
QUEUED
127.0.0.1:6379> set age 29
QUEUED
127.0.0.1:6379> exec //执行事务时,redis不会理睬第2条指令执行错误
1) OK
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
3) OK
127.0.0.1:6379> get age
“29” //可以看出第3条指令被成功执行了

好了,我们来说说最后一个指令“WATCH”,这是一个很好用的指令,它可以帮我们实现类似于“乐观锁”的效果,即CAS(check and set)。

WATCH本身的作用是“监视key是否被改动过”,而且支持同时监视多个key,只要还没真正触发事务,WATCH都会尽职尽责的监视,一旦发现某个key被修改了,在执行EXEC时就会返回nil,表示事务无法触发。

127.0.0.1:6379> set age 23
OK
127.0.0.1:6379> watch age //开始监视age
OK
127.0.0.1:6379> set age 24 //在EXEC之前,age的值被修改了
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set age 25
QUEUED
127.0.0.1:6379> get age
QUEUED
127.0.0.1:6379> exec //触发EXEC
(nil) //事务无法被执行

【教你看懂redis配置 – 简介】

我们可以在启动redis-server时指定应该加载的配置文件,方法如下:

$ ./redis-server /path/to/redis.conf

接下来,我们就来讲解下redis配置文件的各个配置项的含义,注意,本文是基于redis-2.8.4版本进行讲解的。

redis官方提供的redis.conf文件,足有700+行,其中100多行为有效配置行,另外的600多行为注释说明。

在配置文件的开头部分,首先明确了一些度量单位:

# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes

可以看出,redis配置中对单位的大小写不敏感,1GB、1Gb和1gB都是相同的。由此也说明,redis只支持bytes,不支持bit单位。

redis支持“主配置文件中引入外部配置文件”,很像C/C++中的include指令,比如:

daemonize no

如果你看过redis的配置文件,会发现还是很有条理的。redis配置文件被分成了几大块区域,它们分别是:

1.通用(general)

2.快照(snapshotting)

3.复制(replication)

4.安全(security)

5.限制(limits)

6.追加模式(append only mode)

7.LUA脚本(lua scripting)

8.慢日志(slow log)

9.事件通知(event notification)

下面我们就来逐一讲解。

【教你看懂redis配置 -通用】

默认情况下,redis并不是以daemon形式来运行的。通过daemonize配置项可以控制redis的运行形式,如果改为yes,那么redis就会以daemon形式运行:

daemonize no

当以daemon形式运行时,redis会生成一个pid文件,默认会生成在/var/run/redis.pid。当然,你可以通过pidfile来指定pid文件生成的位置,比如:

pidfile /path/to/redis.pid

默认情况下,redis会响应本机所有可用网卡的连接请求。当然,redis允许你通过bind配置项来指定要绑定的IP,比如:

bind 192.168.1.2 10.8.4.2

redis的默认服务端口是6379,你可以通过port配置项来修改。如果端口设置为0的话,redis便不会监听端口了

port 6379

有些同学会问“如果redis不监听端口,还怎么与外界通信呢”,其实redis还支持通过unix socket方式来接收请求。可以通过unixsocket配置项来指定unix socket文件的路径,并通过unixsocketperm来指定文件的权限。

unixsocket /tmp/redis.sock
unixsocketperm 755

当一个redis-client一直没有请求发向server端,那么server端有权主动关闭这个连接,可以通过timeout来设置“空闲超时时限”,0表示永不关闭。

timeout 0

TCP连接保活策略,可以通过tcp-keepalive配置项来进行设置,单位为秒,假如设置为60秒,则server端会每60秒向连接空闲的客户端发起一次ACK请求,以检查客户端是否已经挂掉,对于无响应的客户端则会关闭其连接。所以关闭一个连接最长需要120秒的时间。如果设置为0,则不会进行保活检测。

tcp-keepalive 0

redis支持通过loglevel配置项设置日志等级,共分四级,即 debug 、 verbose 、 notice 、 warning 。

loglevel notice

redis也支持通过logfile配置项来设置日志文件的生成位置。如果设置为空字符串,则redis会将日志输出到标准输出。假如你在daemon情况下将日志设置为输出到标准输出,则日志会被写到/dev/null中。

logfile “”

如果希望日志打印到syslog中,也很容易,通过syslog-enabled来控制。另外,syslog-ident还可以让你指定syslog里的日志标志,比如:

syslog-ident redis

而且还支持指定syslog设备,值可以是USER或LOCAL0-LOCAL7。具体可以参考syslog服务本身的用法。

syslog-facility local0!

对于redis来说,可以设置其数据库的总数量,假如你希望一个redis包含16个数据库,那么设置如下:

databases 16

这16个数据库的编号将是0到15。默认的数据库是编号为0的数据库。用户可以使用select <DBid>来选择相应的数据库。

【教你看懂redis配置 – 快照】

快照,主要涉及的是redis的RDB持久化相关的配置,我们来一起看一看。

我们可以用如下的指令来让数据保存到磁盘上,即控制RDB快照功能:

save <seconds> <changes>

举例来说:

save 900 1 //表示每15分钟且至少有1个key改变,就触发一次持久化
save 300 10 //表示每5分钟且至少有10个key改变,就触发一次持久化
save 60 10000 //表示每60秒至少有10000个key改变,就触发一次持久化

如果你想禁用RDB持久化的策略,只要不设置任何save指令就可以,或者给save传入一个空字符串参数也可以达到相同效果,就像这样:

save “”

如果用户开启了RDB快照功能,那么在redis持久化数据到磁盘时如果出现失败,默认情况下,redis会停止接受所有的写请求。这样做的好处在于可以让用户很明确的知道内存中的数据和磁盘上的数据已经存在不一致了。如果redis不顾这种不一致,一意孤行的继续接收写请求,就可能会引起一些灾难性的后果。

如果下一次RDB持久化成功,redis会自动恢复接受写请求。

当然,如果你不在乎这种数据不一致或者有其他的手段发现和控制这种不一致的话,你完全可以关闭这个功能,以便在快照写入失败时,也能确保redis继续接受新的写请求。配置项如下:

stop-writes-on-bgsave-error yes

对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能,但是存储在磁盘上的快照会比较大。

rdbcompression yes

在存储快照后,我们还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果你希望获取到最大的性能提升,可以关闭此功能。

rdbchecksum yes

我们还可以设置快照文件的名称,默认是这样配置的:

dbfilename dump.rdb

最后,你还可以设置这个快照文件存放的路径。比如默认设置就是当前文件夹:

dir ./

【教你看懂redis配置 – 复制】

redis提供了主从同步功能。

通过slaveof配置项可以控制某一个redis作为另一个redis的从服务器,通过指定IP和端口来定位到主redis的位置。一般情况下,我们会建议用户为从redis设置一个不同频率的快照持久化的周期,或者为从redis配置一个不同的服务端口等等。

slaveof <masterip> <masterport>

如果主redis设置了验证密码的话(使用requirepass来设置),则在从redis的配置中要使用masterauth来设置校验密码,否则的话,主redis会拒绝从redis的访问请求。

masterauth <master-password>

当从redis失去了与主redis的连接,或者主从同步正在进行中时,redis该如何处理外部发来的访问请求呢?这里,从redis可以有两种选择:

第一种选择:如果slave-serve-stale-data设置为yes(默认),则从redis仍会继续响应客户端的读写请求。

第二种选择:如果slave-serve-stale-data设置为no,则从redis会对客户端的请求返回“SYNC with master in progress”,当然也有例外,当客户端发来INFO请求和SLAVEOF请求,从redis还是会进行处理。

你可以控制一个从redis是否可以接受写请求。将数据直接写入从redis,一般只适用于那些生命周期非常短的数据,因为在主从同步时,这些临时数据就会被清理掉。自从redis2.6版本之后,默认从redis为只读。

slave-read-only yes

只读的从redis并不适合直接暴露给不可信的客户端。为了尽量降低风险,可以使用rename-command指令来将一些可能有破坏力的命令重命名,避免外部直接调用。比如:

rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52

从redis会周期性的向主redis发出PING包。你可以通过repl_ping_slave_period指令来控制其周期。默认是10秒。

repl-ping-slave-period 10

在主从同步时,可能在这些情况下会有超时发生:

1.以从redis的角度来看,当有大规模IO传输时。

2.以从redis的角度来看,当数据传输或PING时,主redis超时

3.以主redis的角度来看,在回复从redis的PING时,从redis超时

用户可以设置上述超时的时限,不过要确保这个时限比repl-ping-slave-period的值要大,否则每次主redis都会认为从redis超时。

repl-timeout 60

我们可以控制在主从同步时是否禁用TCP_NODELAY。如果开启TCP_NODELAY,那么主redis会使用更少的TCP包和更少的带宽来向从redis传输数据。但是这可能会增加一些同步的延迟,大概会达到40毫秒左右。如果你关闭了TCP_NODELAY,那么数据同步的延迟时间会降低,但是会消耗更多的带宽。(如果你不了解TCP_NODELAY,可以到这里来科普一下)。

repl-disable-tcp-nodelay no

我们还可以设置同步队列长度。队列长度(backlog)是主redis中的一个缓冲区,在与从redis断开连接期间,主redis会用这个缓冲区来缓存应该发给从redis的数据。这样的话,当从redis重新连接上之后,就不必重新全量同步数据,只需要同步这部分增量数据即可。

repl-backlog-size 1mb

如果主redis等了一段时间之后,还是无法连接到从redis,那么缓冲队列中的数据将被清理掉。我们可以设置主redis要等待的时间长度。如果设置为0,则表示永远不清理。默认是1个小时。

repl-backlog-ttl 3600

我们可以给众多的从redis设置优先级,在主redis持续工作不正常的情况,优先级高的从redis将会升级为主redis。而编号越小,优先级越高。比如一个主redis有三个从redis,优先级编号分别为10、100、25,那么编号为10的从redis将会被首先选中升级为主redis。当优先级被设置为0时,这个从redis将永远也不会被选中。默认的优先级为100。

slave-priority 100

假如主redis发现有超过M个从redis的连接延时大于N秒,那么主redis就停止接受外来的写请求。这是因为从redis一般会每秒钟都向主redis发出PING,而主redis会记录每一个从redis最近一次发来PING的时间点,所以主redis能够了解每一个从redis的运行情况。

min-slaves-to-write 3
min-slaves-max-lag 10

上面这个例子表示,假如有大于等于3个从redis的连接延迟大于10秒,那么主redis就不再接受外部的写请求。上述两个配置中有一个被置为0,则这个特性将被关闭。默认情况下min-slaves-to-write为0,而min-slaves-max-lag为10。

【教你看懂redis配置 – 安全】

我们可以要求redis客户端在向redis-server发送请求之前,先进行密码验证。当你的redis-server处于一个不太可信的网络环境中时,相信你会用上这个功能。由于redis性能非常高,所以每秒钟可以完成多达15万次的密码尝试,所以你最好设置一个足够复杂的密码,否则很容易被黑客破解。

requirepass zhimakaimen

这里我们通过requirepass将密码设置成“芝麻开门”。

redis允许我们对redis指令进行更名,比如将一些比较危险的命令改个名字,避免被误执行。比如可以把CONFIG命令改成一个很复杂的名字,这样可以避免外部的调用,同时还可以满足内部调用的需要:

rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c89

我们甚至可以禁用掉CONFIG命令,那就是把CONFIG的名字改成一个空字符串:

rename-command CONFIG “”

但需要注意的是,如果你使用AOF方式进行数据持久化,或者需要与从redis进行通信,那么更改指令的名字可能会引起一些问题。

【教你看懂redis配置 -限制】

我们可以设置redis同时可以与多少个客户端进行连接。默认情况下为10000个客户端。当你无法设置进程文件句柄限制时,redis会设置为当前的文件句柄限制值减去32,因为redis会为自身内部处理逻辑留一些句柄出来。

如果达到了此限制,redis则会拒绝新的连接请求,并且向这些连接请求方发出“max number of clients reached”以作回应。

maxclients 10000

我们甚至可以设置redis可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据,移除规则可以通过maxmemory-policy来指定。

如果redis无法根据移除规则来移除内存中的数据,或者我们设置了“不允许移除”,那么redis则会针对那些需要申请内存的指令返回错误信息,比如SET、LPUSH等。但是对于无内存申请的指令,仍然会正常响应,比如GET等。

maxmemory <bytes>

需要注意的一点是,如果你的redis是主redis(说明你的redis有从redis),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素。

对于内存移除规则来说,redis提供了多达6种的移除规则。他们是:

1.volatile-lru:使用LRU算法移除过期集合中的key

2.allkeys-lru:使用LRU算法移除key

3.volatile-random:在过期集合中移除随机的key

4.allkeys-random:移除随机的key

5.volatile-ttl:移除那些TTL值最小的key,即那些最近才过期的key。

6.noeviction:不进行移除。针对写操作,只是返回错误信息。

无论使用上述哪一种移除规则,如果没有合适的key可以移除的话,redis都会针对写请求返回错误信息。

maxmemory-policy volatile-lru

LRU算法和最小TTL算法都并非是精确的算法,而是估算值。所以你可以设置样本的大小。假如redis默认会检查三个key并选择其中LRU的那个,那么你可以改变这个key样本的数量。

maxmemory-samples 3

最后,我们补充一个信息,那就是到目前版本(2.8.4)为止,redis支持的写指令包括了如下这些:

set setnx setex append
incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
getset mset msetnx exec sort

【教你看懂redis配置 – 追加模式】

默认情况下,redis会异步的将数据持久化到磁盘。这种模式在大部分应用程序中已被验证是很有效的,但是在一些问题发生时,比如断电,则这种机制可能会导致数分钟的写请求丢失。

如博文上半部分中介绍的,追加文件(Append Only File)是一种更好的保持数据一致性的方式。即使当服务器断电时,也仅会有1秒钟的写请求丢失,当redis进程出现问题且操作系统运行正常时,甚至只会丢失一条写请求。

我们建议大家,AOF机制和RDB机制可以同时使用,不会有任何冲突。对于如何保持数据一致性的讨论,请参见本文。

appendonly no

我们还可以设置aof文件的名称:

appendfilename “appendonly.aof”

fsync()调用,用来告诉操作系统立即将缓存的指令写入磁盘。一些操作系统会“立即”进行,而另外一些操作系统则会“尽快”进行。

redis支持三种不同的模式:

1.no:不调用fsync()。而是让操作系统自行决定sync的时间。这种模式下,redis的性能会最快。

2.always:在每次写请求后都调用fsync()。这种模式下,redis会相对较慢,但数据最安全。

3.everysec:每秒钟调用一次fsync()。这是性能和安全的折衷。

默认情况下为everysec。有关数据一致性的揭秘,可以参考本文。

appendfsync everysec

当fsync方式设置为always或everysec时,如果后台持久化进程需要执行一个很大的磁盘IO操作,那么redis可能会在fsync()调用时卡住。目前尚未修复这个问题,这是因为即使我们在另一个新的线程中去执行fsync(),也会阻塞住同步写调用。

为了缓解这个问题,我们可以使用下面的配置项,这样的话,当BGSAVE或BGWRITEAOF运行时,fsync()在主进程中的调用会被阻止。这意味着当另一路进程正在对AOF文件进行重构时,redis的持久化功能就失效了,就好像我们设置了“appendsync none”一样。如果你的redis有时延问题,那么请将下面的选项设置为yes。否则请保持no,因为这是保证数据完整性的最安全的选择。

no-appendfsync-on-rewrite no

我们允许redis自动重写aof。当aof增长到一定规模时,redis会隐式调用BGREWRITEAOF来重写log文件,以缩减文件体积。

redis是这样工作的:redis会记录上次重写时的aof大小。假如redis自启动至今还没有进行过重写,那么启动时aof文件的大小会被作为基准值。这个基准值会和当前的aof大小进行比较。如果当前aof大小超出所设置的增长比例,则会触发重写。另外,你还需要设置一个最小大小,是为了防止在aof很小时就触发重写。

Hello World!auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

【教你看懂redis配置 – LUA脚本】

lua脚本的最大运行时间是需要被严格限制的,要注意单位是毫秒:

lua-time-limit 5000

如果此值设置为0或负数,则既不会有报错也不会有时间限制。

【教你看懂redis配置 – 慢日志】

redis慢日志是指一个系统进行日志查询超过了指定的时长。这个时长不包括IO操作,比如与客户端的交互、发送响应内容等,而仅包括实际执行查询命令的时间。

针对慢日志,你可以设置两个参数,一个是执行时长,单位是微秒,另一个是慢日志的长度。当一个新的命令被写入日志时,最老的一条会从命令日志队列中被移除。

单位是微秒,即1000000表示一秒。负数则会禁用慢日志功能,而0则表示强制记录每一个命令。

slowlog-log-slower-than 10000

慢日志最大长度,可以随便填写数值,没有上限,但要注意它会消耗内存。你可以使用SLOWLOG RESET来重设这个值。

slowlog-max-len 128

【教你看懂redis配置 – 事件通知】

redis可以向客户端通知某些事件的发生。这个特性的具体解释可以参见本文。

【教你看懂redis配置 – 高级配置】

有关哈希数据结构的一些配置项:

hash-max-ziplist-entries 512
hash-max-ziplist-value 64

有关列表数据结构的一些配置项:

list-max-ziplist-entries 512
list-max-ziplist-value 64

有关集合数据结构的配置项:

set-max-intset-entries 512

有关有序集合数据结构的配置项:

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

关于是否需要再哈希的配置项:

activerehashing yes

关于客户端输出缓冲的控制项:

client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60

有关频率的配置项:

hz 10

有关重写aof的配置项

aof-rewrite-incremental-fsync yes

至此,redis的入门内容就结束了,内容实在不少,但相对来说都很基础,本文没有涉及redis集群、redis工作原理、redis源码、redis相关LIB库等内容,后续会陆续奉献,大家敬请期待:)

Java8新特性教程

接口的默认方法

Java8允许开发者通过使用关键字 default  向接口中加入非抽象方法。这一新的特性被称之为扩展方法。下面是我们的第一个例子:
  1. interface Formula {
  2.     double calculate(int a);
  3.     default double sqrt(int a) {
  4.         return Math.sqrt(a);
  5.     }
  6. }

在抽象方法calculator之外,接口Formula还定义了一个默认方法sqrt。实现类只需要实现抽象方法calculate。默认方法sqrt可以在定义之外使用。如:

  1. Formula formula = new Formula() {
  2.     @Override
  3.     public double calculate(int a) {
  4.         return sqrt(a * 100);
  5.     }
  6. };
  7. formula.calculate(100);     // 100.0
  8. formula.sqrt(16);           // 4.0

formula被实现为一个匿名类。代码有点啰嗦:六行代码里就就只有一句简单的计算:sqrt(a*100)。我们将在下面部分看到一种用java8实现的更加简洁的办法。

Lambda表达式

让我们使用一个简单的例子来展示在java8以前是如何对字符串列表进行排序的:
  1. List<String> names = Arrays.asList(“peter”“anna”“mike”“xenia”);
  2. Collections.sort(names, new Comparator<String>() {
  3.     @Override
  4.     public int compare(String a, String b) {
  5.         return b.compareTo(a);
  6.     }
  7. });

这个静态工具方法Collections.sort接受一个列表和一个用于元素比较的比较器。你会发现自己会经常创建匿名类并把它们传递给排序方法。

为了不用整天创建这些匿名类,java8带来了一个非常简短的语法–lambda表达式:
  1. Collections.sort(names, (String a, String b) -> {
  2.     return b.compareTo(a);
  3. });

现在的代码已经变得简短和便于阅读。但是,实际上,它可以变得更加简短:

  1. Collections.sort(names, (String a, String b) -> b.compareTo(a));
对于这种一行代码体的表达式,你可以直接省略掉大括号{}和return关键字。它就变成下面这种更加简短的写法:
  1. Collections.sort(names, (a, b) -> b.compareTo(a));

java编译器能够探测到这些参数的类型,这样使得的你可以直接跳过它们。下面我们来解答为什么lambda表示式可以这样随意的使用。

功能性接口

lambda表达式是如何和java系统的类型进行对应的?每个lambda表达式都对应一个指定的类型,这个指定的类型是由接口确定的。该接口被称之为功能性接口,它必须且恰好只包含一个抽象方法声明。被指定接口类型所对应的lambda表达式刚好和这个接口的抽象方法想匹配。因为默认方法不是抽象的,因此你可以在你的功能性接口中添加多个默认方法。
我们可以将任意的接口用作lambda表示式,只要该接口仅仅包含一个抽象方法。为了确保你定义的接口达到要求,你可以在接口上添加@FunctionInterface注解。编译器可以检测到该注解并判断你的接口是否满足条件,如果 你定义的接口包含多个抽象方法时,编译器便会报错。
示例:
  1. @FunctionalInterface
  2. interface Converter<F, T> {
  3.     T convert(F from);
  4. }
  1. Converter<String, Integer> converter = (from) -> Integer.valueOf(from);
  2. Integer converted = converter.convert(“123”);
  3. System.out.println(converted);    // 123

如果FunctionInterface注解被添加,你定义的接口将总会被检测。

方法和构造函数引用

前部分的示例在使用静态方法引用的情况下可以被进一步的简化:

  1. Converter<String, Integer> converter = Integer::valueOf;
  2. Integer converted = converter.convert(“123”);
  3. System.out.println(converted);   // 123

java8可以让你通过关键字::来传递方法和构造函数的引用。上面的示例展示了如何引用一个静态方法。我们同样也可以引用对象方法。

  1. class Something {
  2.     String startsWith(String s) {
  3.         return String.valueOf(s.charAt(0));
  4.     }
  5. }
  1. Something something = new Something();
  2. Converter<String, String> converter = something::startsWith;
  3. String converted = converter.convert(“Java”);
  4. System.out.println(converted);    // “J”

现在我们将看到关键字::如何为构造函数工作。首先我们定义一个拥有不同构造函数的bean类:

  1. class Person {
  2.     String firstName;
  3.     String lastName;
  4.     Person() {}
  5.     Person(String firstName, String lastName) {
  6.         this.firstName = firstName;
  7.         this.lastName = lastName;
  8.     }
  9. }

接下来我们定义一个用来创建类person的工厂接口:

  1. </pre><pre code_snippet_id=“265470” snippet_file_name=“blog_20140330_13_2612710” name=“code” class=“java”>interface PersonFactory<P extends Person> {
  2.     P create(String firstName, String lastName);
  3. }
不使用通常的手动实现工厂类,我们通过使用构造函数将所有的工作联合在一起:
  1. PersonFactory<Person> personFactory = Person::new;
  2. Person person = personFactory.create(“Peter”“Parker”);

我们通过Person::new创建一个指向Person构造函数的引用。java编译器自动的选择正确的构造函数来匹配PersonFactory.create的函数签名。

Lambda范围

在lambda表达式里访问外部变量和匿名类的方式是十分相似的。你可以在lambda中访问外部的final变量,访问实例字段和静态变量的方法也是如此。

访问本地变量

我们可以访问在lambda表示式之外的本地final变量:
  1. final int num = 1;
  2. Converter<Integer, String> stringConverter =
  3.         (from) -> String.valueOf(from + num);
  4. stringConverter.convert(2);     // 3

但是和匿名变量不同的是变量num不必强制的被声明为final。下面的代码依然是合法的:

  1. int num = 1;
  2. Converter<Integer, String> stringConverter =
  3.         (from) -> String.valueOf(from + num);
  4. stringConverter.convert(2);     // 3

但是实际上,变量num在编译期是被隐式的转换为fianl类型的。下面的代码是不能被成功的编译的:

  1. int num = 1;
  2. Converter<Integer, String> stringConverter =
  3.         (from) -> String.valueOf(from + num);
  4. num = 3;

在lambda表达式内部向变量num写入值同样是不允许的。

访问对象字段和静态变量

和访问本地变量相反,我们在lambda表达式里即可以读取也可以写入对象字段和静态变量。这一准则同样适用于匿名类。
  1. class Lambda4 {
  2.     static int outerStaticNum;
  3.     int outerNum;
  4.     void testScopes() {
  5.         Converter<Integer, String> stringConverter1 = (from) -> {
  6.             outerNum = 23;
  7.             return String.valueOf(from);
  8.         };
  9.         Converter<Integer, String> stringConverter2 = (from) -> {
  10.             outerStaticNum = 72;
  11.             return String.valueOf(from);
  12.         };
  13.     }
  14. }

访问接口默认方法

还记得第一部分的formula示例么?接口formula定义了一个默认方法sqrt,这个方法可以被formula的实例和匿名实例所访问。
但是这个方法不能被lambda表达式所访问。
默认方法不能被lambda表示式内部的代码访问。下面的代码不能通过编译。
  1. Formula formula = (a) -> sqrt( a * 100);

内建的功能性接口

JDK1.8包括了许多功能性接口。它们中的一些是老版本中被熟知的接口,例如Comparator和Runnable。这些已存在的接口已经通过@FunctionalInterface注解扩展为支持Lambda表达式。
同时Java8的API也包含了很多新的功能性接口简化你的开发。一些新的接口是来自非常出名的Google Guava库。即使你已经对这库十分熟悉了,你也应当留意下这些接口是如何被扩展的。

断言接口(Predicates)

Predicates是只拥有一个参数的Boolean型功能的接口。这个接口拥有多个默认方法用于构成predicates复杂的逻辑术语。
  1. Predicate<String> predicate = (s) -> s.length() > 0;
  2. predicate.test(“foo”);              // true
  3. predicate.negate().test(“foo”);     // false
  4. Predicate<Boolean> nonNull = Objects::nonNull;
  5. Predicate<Boolean> isNull = Objects::isNull;
  6. Predicate<String> isEmpty = String::isEmpty;
  7. Predicate<String> isNotEmpty = isEmpty.negate();

功能接口(Functions)

Functions接受一个参数并产生一个结果。默认方法能够用于将多个函数链接在一起。
  1. Function<String, Integer> toInteger = Integer::valueOf;
  2. Function<String, String> backToString = toInteger.andThen(String::valueOf);
  3. backToString.apply(“123”);     // “123”

供应接口(Suppliers)

Suppliers对于给定的泛型类型产生一个实例。不同于Functions,Suppliers不需要任何参数。
  1. Supplier<Person> personSupplier = Person::new;
  2. personSupplier.get();   // new Person

消费接口(Consumers)

Consumers代表在只有一个输入参数时操作被如何执行。
  1. Consumer<Person> greeter = (p) -> System.out.println(“Hello, “ + p.firstName);
  2. greeter.accept(new Person(“Luke”“Skywalker”));

比较接口(Comparators)

Comparators在老版本中就已经被熟知。Java8向该接口中添加了多种默认方法。
  1. Comparator<Person> comparator = (p1, p2) -> p1.firstName.compareTo(p2.firstName);
  2. Person p1 = new Person(“John”“Doe”);
  3. Person p2 = new Person(“Alice”“Wonderland”);
  4. comparator.compare(p1, p2);             // > 0
  5. comparator.reversed().compare(p1, p2);  // < 0

选项接口(Optionals)

Optionals并不是功能性接口,反而它是一种特殊的工具用来阻止NullPointerException。我们首先快速的浏览Optionals是如何工作的,因为它在下一章节是十分重要的概念。

Optional是一种可以包含null和non-null值的简单容器。考虑到方法可以返回non-null结果,偶尔也可能任何都不返回。在Java8中,你可以返回Optional而不是返回null。
  1. Optional<String> optional = Optional.of(“bam”);
  2. optional.isPresent();           // true
  3. optional.get();                 // “bam”
  4. optional.orElse(“fallback”);    // “bam”
  5. optional.ifPresent((s) -> System.out.println(s.charAt(0)));     // “b”

流接口(Streams)

java.util.Stream代表着一串你可以在其上进行多种操作的元素。流操作既可以是连续的也可以是中断的。中断操作返回操作结果。而连续操作返回流本身,这样你就可以在该行上继续操作。流是创建在数据源上的,例如:java.util.Collection、list集合和set集合。流操作既可以顺序执行也可以并行执行。

我们首先了解下顺序的流是如何工作的。我们首先创建一个字符串链表。
  1. List<String> stringCollection = new ArrayList<>();
  2. stringCollection.add(“ddd2”);
  3. stringCollection.add(“aaa2”);
  4. stringCollection.add(“bbb1”);
  5. stringCollection.add(“aaa1”);
  6. stringCollection.add(“bbb3”);
  7. stringCollection.add(“ccc”);
  8. stringCollection.add(“bbb2”);
  9. stringCollection.add(“ddd1”);

Java8的Collections类已经被扩展了,你可以简单的调用Collection.stream()或者Collection.parallelSteam()来创建流。下面部分将介绍大部分流操作。

Filter

Filter接受一个predicate来过滤流中的所有元素。这个操作是连续的,它可以让我们在结果上继续调用另外一个流操作forEach。ForEach接受一个consumer,它被用来对过滤流中的每个元素执行操作。ForEach是一个中断操作,因此我们不能在ForEach后调用其他流操作。
  1. stringCollection
  2.     .stream()
  3.     .filter((s) -> s.startsWith(“a”))
  4.     .forEach(System.out::println);
  5. // “aaa2”, “aaa1”

Sorted

Sorted是一个连续操作,它返回流的已排序版本。如果你没有显示的指定Comparator,那么流中元素的排序规则为默认的。
  1. stringCollection
  2.     .stream()
  3.     .sorted()
  4.     .filter((s) -> s.startsWith(“a”))
  5.     .forEach(System.out::println);
  6. // “aaa1”, “aaa2”

需要注意的是sorted只创建了流的排序结果,它并没有改变集合中元素的排序位置。stringCollection中元素排序是没有改变的。

  1. System.out.println(stringCollection);
  2. // ddd2, aaa2, bbb1, aaa1, bbb3, ccc, bbb2, ddd1

Map

连续性操作map通过指定的Function将流中的每个元素转变为另外的对象。下面的示例将每个字符串转换为大写的字符串。此外,你也可以使用map将每个元素的类型改变为其它类型。转换后流的泛型类型依赖于你传入的Function的泛型类型。
  1. stringCollection
  2.     .stream()
  3.     .map(String::toUpperCase)
  4.     .sorted((a, b) -> b.compareTo(a))
  5.     .forEach(System.out::println);
  6. // “DDD2”, “DDD1”, “CCC”, “BBB3”, “BBB2”, “AAA2”, “AAA1”

Match

各种匹配操作可以用来检测是否某种predicate和流中元素相匹配。所有的这些操作是中断的并返回一个boolean结果。
  1. boolean anyStartsWithA =
  2.     stringCollection
  3.         .stream()
  4.         .anyMatch((s) -> s.startsWith(“a”));
  5. System.out.println(anyStartsWithA);      // true
  6. boolean allStartsWithA =
  7.     stringCollection
  8.         .stream()
  9.         .allMatch((s) -> s.startsWith(“a”));
  10. System.out.println(allStartsWithA);      // false
  11. boolean noneStartsWithZ =
  12.     stringCollection
  13.         .stream()
  14.         .noneMatch((s) -> s.startsWith(“z”));
  15. System.out.println(noneStartsWithZ);      // true

Count

Count是中断型操作,它返回流中的元素数量。
  1. long startsWithB =
  2.     stringCollection
  3.         .stream()
  4.         .filter((s) -> s.startsWith(“b”))
  5.         .count();
  6. System.out.println(startsWithB);    // 3

Reduce

这个中断性操作使用指定的function对流中元素实施消减策略。此操作的返回值是一个包括所有被消减元素的Optional。
  1. Optional<String> reduced =
  2.     stringCollection
  3.         .stream()
  4.         .sorted()
  5.         .reduce((s1, s2) -> s1 + “#” + s2);
  6. reduced.ifPresent(System.out::println);
  7. // “aaa1#aaa2#bbb1#bbb2#bbb3#ccc#ddd1#ddd2”

Parallel Streams

在前面部分我们提到流可以是顺序的也可以是并行的。顺序流的操作是在单线程上执行的,而并行流的操作是在多线程上并发执行的。
随后的例子我们展示了并行流可以多么容易的提高性能。
首先,我们创建一个包含唯一元素的大容器:
  1. int max = 1000000;
  2. List<String> values = new ArrayList<>(max);
  3. for (int i = 0; i < max; i++) {
  4.     UUID uuid = UUID.randomUUID();
  5.     values.add(uuid.toString());
  6. }

现在我们开始测试排序这些元素需要多长时间。

Sequential Sort
  1. long t0 = System.nanoTime();
  2. long count = values.stream().sorted().count();
  3. System.out.println(count);
  4. long t1 = System.nanoTime();
  5. long millis = TimeUnit.NANOSECONDS.toMillis(t1 – t0);
  6. System.out.println(String.format(“sequential sort took: %d ms”, millis));
  7. // sequential sort took: 899 ms

Parallel Sort

  1. long t0 = System.nanoTime();
  2. long count = values.parallelStream().sorted().count();
  3. System.out.println(count);
  4. long t1 = System.nanoTime();
  5. long millis = TimeUnit.NANOSECONDS.toMillis(t1 – t0);
  6. System.out.println(String.format(“parallel sort took: %d ms”, millis));
  7. // parallel sort took: 472 ms

你会观察到这两种模式的代码基本上市一致的,但是并行排序所花费的时间大约是顺序排序的一半。

Map

我们已经提到maps不支持流。然而现在maps包括了许多新的非常有用的方法用于执行通用任务。
  1. Map<Integer, String> map = new HashMap<>();
  2. for (int i = 0; i < 10; i++) {
  3.     map.putIfAbsent(i, “val” + i);
  4. }
  5. map.forEach((id, val) -> System.out.println(val));
  1. </pre>上述的代码应该很清晰了:putIfAbsent使得我们不用写是否为null值的检测语句;forEach使用consumer来对map中的每个元素进行操作。</div><div></div><div>下面的例子向我们展示使用功能性函数在map里执行代码:</div><div><pre code_snippet_id=“265470” snippet_file_name=“blog_20140330_37_9422552” name=“code” class=“java”>map.computeIfPresent(3, (num, val) -> val + num);
  2. map.get(3);             // val33
  3. map.computeIfPresent(9, (num, val) -> null);
  4. map.containsKey(9);     // false
  5. map.computeIfAbsent(23, num -> “val” + num);
  6. map.containsKey(23);    // true
  7. map.computeIfAbsent(3, num -> “bam”);
  8. map.get(3);             // val33

接下来,我们将学习如何删除给定键所对应的元素。删除操作还需要满足给定的值需要和map中的值想等:

  1. map.remove(3“val3”);
  2. map.get(3);             // val33
  3. map.remove(3“val33”);
  4. map.get(3);             // null

其他一些帮助性方法:

  1. map.getOrDefault(42“not found”);  // not found

合并map中的实体是十分容易的:

  1. map.merge(9“val9”, (value, newValue) -> value.concat(newValue));
  2. map.get(9);             // val9
  3. map.merge(9“concat”, (value, newValue) -> value.concat(newValue));
  4. map.get(9);             // val9concat

如果map不存在指定的键,那么它将把该键值对key/value加入map中。反而,如果存在,它将调用function来进行合并操作。

Date API

Java8在包java.time下面包括了一款新的date和time的API。新的Date API和Joda-Time库是相兼容的,但是它们不是一样的。下面的示例覆盖了新API中的重要部分。

Clock

Clock提供了访问当前日期和时间的方法。Clock是时区敏感的并且它可以被用来替代System.currentTimeMillis进行获取当前毫秒数。同时,时间轴上的时间点是可以用类Instant来表示的。Instants可以被用来创建遗留的java.util.Date对象。
  1. Clock clock = Clock.systemDefaultZone();
  2. long millis = clock.millis();
  3. Instant instant = clock.instant();
  4. Date legacyDate = Date.from(instant);   // legacy java.util.Date

TimeZones

TimeZones被用来表示ZoneId。它们可以通过静态工厂方法访问。TImeZones定义了时差,它在instants和本地日期时间转换上十分重要。
  1. System.out.println(ZoneId.getAvailableZoneIds());
  2. // prints all available timezone ids
  3. ZoneId zone1 = ZoneId.of(“Europe/Berlin”);
  4. ZoneId zone2 = ZoneId.of(“Brazil/East”);
  5. System.out.println(zone1.getRules());
  6. System.out.println(zone2.getRules());
  7. // ZoneRules[currentStandardOffset=+01:00]
  8. // ZoneRules[currentStandardOffset=-03:00]

LocalTime

本地时间代表了一个和时区无关的时间,e.g. 10pm or 17:30:15. 下面的示例创建了前部分展示的两个时区的本地时间。然后,我们将比较这两个时间并计算出这两个时间在小时和分钟数上的差异。
  1. LocalTime now1 = LocalTime.now(zone1);
  2. LocalTime now2 = LocalTime.now(zone2);
  3. System.out.println(now1.isBefore(now2));  // false
  4. long hoursBetween = ChronoUnit.HOURS.between(now1, now2);
  5. long minutesBetween = ChronoUnit.MINUTES.between(now1, now2);
  6. System.out.println(hoursBetween);       // -3
  7. System.out.println(minutesBetween);     // -239

LocalTime包含了多个工厂方法用来简化创建过程,其中也包括通过字符串来创建时间:

  1. LocalTime late = LocalTime.of(235959);
  2. System.out.println(late);       // 23:59:59
  3. DateTimeFormatter germanFormatter =
  4.     DateTimeFormatter
  5.         .ofLocalizedTime(FormatStyle.SHORT)
  6.         .withLocale(Locale.GERMAN);
  7. LocalTime leetTime = LocalTime.parse(“13:37”, germanFormatter);
  8. System.out.println(leetTime);   // 13:37

LocalDate

LocalDate代表了一个可区分日期,e.g. 2014-03-11。 它是不变的同时工作原理类似于LocalTime。下面的例子描绘了通过加减年,月,日来计算出一个新的日期。需要注意的是这每个操作都返回一个新的实例。
  1. LocalDate today = LocalDate.now();
  2. LocalDate tomorrow = today.plus(1, ChronoUnit.DAYS);
  3. LocalDate yesterday = tomorrow.minusDays(2);
  4. LocalDate independenceDay = LocalDate.of(2014, Month.JULY, 4);
  5. DayOfWeek dayOfWeek = independenceDay.getDayOfWeek();
  6. System.out.println(dayOfWeek);    // FRIDAY

从字符串解析出LocalDate和解析LocalTime一样简单:

  1. DateTimeFormatter germanFormatter =
  2.     DateTimeFormatter
  3.         .ofLocalizedDate(FormatStyle.MEDIUM)
  4.         .withLocale(Locale.GERMAN);
  5. LocalDate xmas = LocalDate.parse(“24.12.2014”, germanFormatter);
  6. System.out.println(xmas);   // 2014-12-24

LocalDateTime

LocalDateTime代表日期和时间。它将我们前部分看到的时间和日期组合进一个实例。LocalDateTime是不可变的并且它的工作原理和LocalTime和LocalDate十分相似。
我们可以从date-time中获取某些字段值:
  1. LocalDateTime sylvester = LocalDateTime.of(2014, Month.DECEMBER, 31235959);
  2. DayOfWeek dayOfWeek = sylvester.getDayOfWeek();
  3. System.out.println(dayOfWeek);      // WEDNESDAY
  4. Month month = sylvester.getMonth();
  5. System.out.println(month);          // DECEMBER
  6. long minuteOfDay = sylvester.getLong(ChronoField.MINUTE_OF_DAY);
  7. System.out.println(minuteOfDay);    // 1439

在一些额外的时区信息帮助下,它可以被转换为instant。Instants可以被容易的转换为遗留的java.util.Date类型。

  1. Instant instant = sylvester
  2.         .atZone(ZoneId.systemDefault())
  3.         .toInstant();
  4. Date legacyDate = Date.from(instant);
  5. System.out.println(legacyDate);     // Wed Dec 31 23:59:59 CET 2014

格式date-time的过程和格式date和time基本上是一样的。在使用系统自带的定义格式时,我们也可以定义我们自己的格式:

  1. DateTimeFormatter formatter =
  2.     DateTimeFormatter
  3.         .ofPattern(“MMM dd, yyyy – HH:mm”);
  4. LocalDateTime parsed = LocalDateTime.parse(“Nov 03, 2014 – 07:13”, formatter);
  5. String string = formatter.format(parsed);
  6. System.out.println(string);     // Nov 03, 2014 – 07:13

和java.text.NumberFormat不一样的是DateTimeFormatter是不可变的并且是类型安全的。

如果想了解详细的格式语法,可以阅读这里

Annotations

Java8中的Annotations是可重复。现在我们深入的学习一个例子来理解它。
首先,我们定义一个包装注解,它包含了一个实际注解的数组。
  1. @interface Hints {
  2.     Hint[] value();
  3. }
  4. @Repeatable(Hints.class)
  5. @interface Hint {
  6.     String value();
  7. }

Java8可以使同一个注解类型同时使用多次,只要我们在注解声明时使用@Repeatable。

情景1:使用容器注解
  1. @Hints({@Hint(“hint1”), @Hint(“hint2”)})
  2. class Person {}

情景2:使用可重复注解

  1. @Hint(“hint1”)
  2. @Hint(“hint2”)
  3. class Person {}

在第二种情景下,java编译器隐式的在该注解使用中加入@Hints。这种后期处理在通过反射获取注解是十分重要的。

  1. Hint hint = Person.class.getAnnotation(Hint.class);
  2. System.out.println(hint);                   // null
  3. Hints hints1 = Person.class.getAnnotation(Hints.class);
  4. System.out.println(hints1.value().length);  // 2
  5. Hint[] hints2 = Person.class.getAnnotationsByType(Hint.class);
  6. System.out.println(hints2.length);          // 2

虽然我们从来没有在类Person上声明@Hints注解,但该信息还是可以通过getAnnotation(Hint.class)获得。 此外,getAnnotationsByType是一种更加便利的方法,它可以保证我们访问所有使用的@Hint注解。

此外,Java8中注解的使用范围扩展到两种新的类型:
  1. @Target({ElementType.TYPE_PARAMETER, ElementType.TYPE_USE})
  2. @interface MyAnnotation {}

总结

我的Java8语言特性教程到此就结束了。此外还有很多新的内容需要阐述。去不去了解JDK8中的这些非常棒的特性取决于你,这些特性包括有Arrays.parallelSort,StampedLock,CompletableFuture  —即使列举名字也有很多了。我已经在网站上把这些特性都列举出来了,你可以去哪里看看。
原文地址: http://winterbe.com/posts/2014/03/16/java-8-tutorial/

对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解

根据 Wiki 对 Zero-copy 的定义:

“Zero-copy” describes computer operations in which the CPU does not perform the task of copying data from one memory area to another. This is frequently used to save CPU cycles and memory bandwidth when transmitting a file over a network.

即所谓的 Zero-copy, 就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 因为少了一次内存的拷贝, 因此 CPU 的效率就得到的提升.

在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space)内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space)内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率.

而需要注意的是, Netty 中的 Zero-copy 与上面我们所提到到 OS 层面上的 Zero-copy 不太一样, Netty的 Zero-coyp 完全是在用户态(Java 层面)的, 它的 Zero-copy 的更多的是偏向于 优化数据操作 这样的概念.

Netty 的 Zero-copy 体现在如下几个个方面:

  • Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝.
  • 通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作.
  • ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝.
  • 通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题.

下面我们就来简单了解一下这几种常见的零拷贝操作.

通过 CompositeByteBuf 实现零拷贝

假设我们有一份协议数据, 它由头部和消息体组成, 而头部和消息体是分别存放在两个 ByteBuf 中的, 即:

ByteBuf header = ...
ByteBuf body = ...

我们在代码处理中, 通常希望将 header 和 body 合并为一个 ByteBuf, 方便处理, 那么通常的做法是:

ByteBuf allBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
allBuf.writeBytes(header);
allBuf.writeBytes(body);

可以看到, 我们将 header 和 body 都拷贝到了新的 allBuf 中了, 这无形中增加了两次额外的数据拷贝操作了.

那么有没有更加高效优雅的方式实现相同的目的呢? 我们来看一下 CompositeByteBuf 是如何实现这样的需求的吧.

ByteBuf header = ...
ByteBuf body = ...

CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true, header, body);

上面代码中, 我们定义了一个 CompositeByteBuf 对象, 然后调用

public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
...
}

方法将 headerbody 合并为一个逻辑上的 ByteBuf, 即:

不过需要注意的是, 虽然看起来 CompositeByteBuf 是由两个 ByteBuf 组合而成的, 不过在 CompositeByteBuf 内部, 这两个 ByteBuf 都是单独存在的, CompositeByteBuf 只是逻辑上是一个整体.

上面 CompositeByteBuf 代码还以一个地方值得注意的是, 我们调用 addComponents(boolean increaseWriterIndex, ByteBuf... buffers) 来添加两个 ByteBuf, 其中第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex.
如果我们调用的是

compositeByteBuf.addComponents(header, body);

那么其实 compositeByteBufwriteIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据, 这一点希望大家要特别注意.

除了上面直接使用 CompositeByteBuf 类外, 我们还可以使用 Unpooled.wrappedBuffer 方法, 它底层封装了 CompositeByteBuf 操作, 因此使用起来更加方便:

ByteBuf header = ...
ByteBuf body = ...

ByteBuf allByteBuf = Unpooled.wrappedBuffer(header, body);

通过 wrap 操作实现零拷贝

例如我们有一个 byte 数组, 我们希望将它转换为一个 ByteBuf 对象, 以便于后续的操作, 那么传统的做法是将此 byte 数组拷贝到 ByteBuf 中, 即:

byte[] bytes = ...
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(bytes);

显然这样的方式也是有一个额外的拷贝操作的, 我们可以使用 Unpooled 的相关方法, 包装这个 byte 数组, 生成一个新的 ByteBuf 实例, 而不需要进行拷贝操作. 上面的代码可以改为:

byte[] bytes = ...
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

可以看到, 我们通过 Unpooled.wrappedBuffer 方法来将 bytes 包装成为一个 UnpooledHeapByteBuf 对象, 而在包装的过程中, 是不会有拷贝操作的. 即最后我们生成的生成的 ByteBuf 对象是和 bytes 数组共用了同一个存储空间, 对 bytes 的修改也会反映到 ByteBuf 对象中.

Unpooled 工具类还提供了很多重载的 wrappedBuffer 方法:

public static ByteBuf wrappedBuffer(byte[] array)
public static ByteBuf wrappedBuffer(byte[] array, int offset, int length)

public static ByteBuf wrappedBuffer(ByteBuffer buffer)
public static ByteBuf wrappedBuffer(ByteBuf buffer)

public static ByteBuf wrappedBuffer(byte[]... arrays)
public static ByteBuf wrappedBuffer(ByteBuf... buffers)
public static ByteBuf wrappedBuffer(ByteBuffer... buffers)

public static ByteBuf wrappedBuffer(int maxNumComponents, byte[]... arrays)
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers)
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuffer... buffers)

这些方法可以将一个或多个 buffer 包装为一个 ByteBuf 对象, 从而避免了拷贝操作.

通过 slice 操作实现零拷贝

slice 操作和 wrap 操作刚好相反, Unpooled.wrappedBuffer 可以将多个 ByteBuf 合并为一个, 而 slice 操作可以将一个 ByteBuf 切片 为多个共享一个存储区域的 ByteBuf 对象.
ByteBuf 提供了两个 slice 操作方法:

public ByteBuf slice();
public ByteBuf slice(int index, int length);

不带参数的 slice 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()) 调用, 即返回 buf 中可读部分的切片. 而 slice(int index, int length) 方法相对就比较灵活了, 我们可以设置不同的参数来获取到 buf 的不同区域的切片.

下面的例子展示了 ByteBuf.slice 方法的简单用法:

ByteBuf byteBuf = ...
ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);

slice 方法产生 header 和 body 的过程是没有拷贝操作的, header 和 body 对象在内部其实是共享了 byteBuf 存储空间的不同部分而已. 即:

通过 FileRegion 实现零拷贝

Netty 中使用 FileRegion 实现文件传输的零拷贝, 不过在底层 FileRegion 是依赖于 Java NIO FileChannel.transfer 的零拷贝功能.

首先我们从最基础的 Java IO 开始吧. 假设我们希望实现一个文件拷贝的功能, 那么使用传统的方式, 我们有如下实现:

public static void copyFile(String srcFile, String destFile) throws Exception {
    byte[] temp = new byte[1024];
    FileInputStream in = new FileInputStream(srcFile);
    FileOutputStream out = new FileOutputStream(destFile);
    int length;
    while ((length = in.read(temp)) != -1) {
        out.write(temp, 0, length);
    }

    in.close();
    out.close();
}

上面是一个典型的读写二进制文件的代码实现了. 不用我说, 大家肯定都知道, 上面的代码中不断中源文件中读取定长数据到 temp 数组中, 然后再将 temp 中的内容写入目的文件, 这样的拷贝操作对于小文件倒是没有太大的影响, 但是如果我们需要拷贝大文件时, 频繁的内存拷贝操作就消耗大量的系统资源了.
下面我们来看一下使用 Java NIO 的 FileChannel 是如何实现零拷贝的:

public static void copyFileWithFileChannel(String srcFileName, String destFileName) throws Exception {
    RandomAccessFile srcFile = new RandomAccessFile(srcFileName, "r");
    FileChannel srcFileChannel = srcFile.getChannel();

    RandomAccessFile destFile = new RandomAccessFile(destFileName, "rw");
    FileChannel destFileChannel = destFile.getChannel();

    long position = 0;
    long count = srcFileChannel.size();

    srcFileChannel.transferTo(position, count, destFileChannel);
}

可以看到, 使用了 FileChannel 后, 我们就可以直接将源文件的内容直接拷贝(transferTo) 到目的文件中, 而不需要额外借助一个临时 buffer, 避免了不必要的内存操作.

有了上面的一些理论知识, 我们来看一下在 Netty 中是怎么使用 FileRegion 来实现零拷贝传输一个文件的:

@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    RandomAccessFile raf = null;
    long length = -1;
    try {
        // 1. 通过 RandomAccessFile 打开一个文件.
        raf = new RandomAccessFile(msg, "r");
        length = raf.length();
    } catch (Exception e) {
        ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');
        return;
    } finally {
        if (length < 0 && raf != null) {
            raf.close();
        }
    }

    ctx.write("OK: " + raf.length() + '\n');
    if (ctx.pipeline().get(SslHandler.class) == null) {
        // SSL not enabled - can use zero-copy file transfer.
        // 2. 调用 raf.getChannel() 获取一个 FileChannel.
        // 3. 将 FileChannel 封装成一个 DefaultFileRegion
        ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));
    } else {
        // SSL enabled - cannot use zero-copy file transfer.
        ctx.write(new ChunkedFile(raf));
    }
    ctx.writeAndFlush("\n");
}

上面的代码是 Netty 的一个例子, 其源码在 netty/example/src/main/java/io/netty/example/file/FileServerHandler.java
可以看到, 第一步是通过 RandomAccessFile 打开一个文件, 然后 Netty 使用了 DefaultFileRegion 来封装一个 FileChannel 即:

new DefaultFileRegion(raf.getChannel(), 0, length)

当有了 FileRegion 后, 我们就可以直接通过它将文件的内容直接写入 Channel 中, 而不需要像传统的做法: 拷贝文件内容到临时 buffer, 然后再将 buffer 写入 Channel. 通过这样的零拷贝操作, 无疑对传输大文件很有帮助.

使用 Spring Webflux 进行异步非阻塞编程

Spring webflux,是在将要发布的Spring 5和Spring boot 2中提供的,结合非阻塞IO,Reactive 风格编程的异步非阻塞开发框架。

之前有一篇文章介绍了Vert.x,初次之外Java中还有Ratpack等以异步非阻塞编程为目标的项目。然而就目前来看,Spring Webflux将会是API设计最良好,最方便使用的一个。

Spring Webflux 介绍

Spring Webflux 是一个基于事件驱动的非阻塞实现,底层可以使用:

  1. Netty。
  2. 支持Servlet3.1 Non-Blocking Servlet标准的Web容器。具体的有Tomcat,Undertow,Jetty等。

默认使用的是Netty。毕竟Servlet整个生态都是针对阻塞IO的实现的,Async Servlet和Non-Blocking Servlet就是在Servlet标准中打的奇怪的格格不入的补丁。在性能上,Netty也有着不小的优势。

Spring Webflux 使用的Reactive System实现是Reactor,但是也支持使用RxJava,还有Java8 CompletableFuture。Reactor和RxJava2.0的实现接口基本一致,然而Reactor是基于Java8实现的,可以利用Java8中的许多既有实现(比如CompletableFuture,Stream等)。Reactor中最常使用的是Publisher的两个实现,Mono和Flux,Mono表示0或1,对应于RxJava中的MayBe,Completable,和Single;Flux表示1+数量,对应于RxJava中的Observable。

Spring Webflux 还提供了一个Netty实现的非阻塞WebClient,用来做Http 请求。

Spring Webflux 实例

我们这里完成一个和之前Vert.x一样功能的简单程序,使用HTTP请求网易新闻头条内容,然后抽取其中的文章标题,并以Json格式返回给客户端。项目使用Spring Boot开发。

@RestController
public class TopLinesHandler {

    @Resource
    private ObjectMapper mapper;

    @GetMapping("/top_lines")
    public Mono<Object> handleGetUserById() {
        return getTopLines().map(this::extractTitles);
    }

    private Mono<String> getTopLines() {
        WebClient webClient = WebClient.create("http://c.m.163.com");
        return webClient.get().uri("/nc/article/headline/T1348647853363/0-20.html")
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMap(resp -> resp.bodyToMono(String.class));
    }

    private List<String> extractTitles(String jsonStr) {
        JsonNode jsonNode;
        try {
            jsonNode = mapper.readTree(jsonStr);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        JsonNode articles = jsonNode.get("T1348647853363");
        List<String> list = new ArrayList<>(articles.size());
        for (int i = 0; i < articles.size(); i++) {
            JsonNode article = articles.get(i);
            String title = article.get("title").textValue();
            list.add(title);
        }
        return list;
    }
}

可以看到,在webflux中,也可以使用SpringMVC中定义的注解,这大大简化了路由,response处理等工作。

然后我们需要启动一个Spring Webflux应用程序:

@SpringBootApplication
public class WebfluxApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder()
                .bannerMode(Banner.Mode.OFF)
                .sources(WebfluxApplication.class)
                .run(args);
    }
}

还是熟悉的配方,还是熟悉的味道。

当然,对于JDBC这种只有同步阻塞实现的,还是需要wrap到额外的线程池,以避免阻塞EventLoop,目前Spring Webflux还没有对这些做封装,需要的话只能自己动手了。

现在已经可以使用start.spring.io/方便的创建自己的Spring Webflux项目,注意SpringBoot要选2.0版本,Dependencies里加上Webflux。

Java多线程编程:Callable、Future和FutureTask浅析(多线程编程之四)

Java多线程:Callable、Future和FutureTask浅析(多线程编程之四)

通过前面几篇的学习,我们知道创建线程的方式有两种,一种是实现Runnable接口,另一种是继承Thread,但是这两种方式都有个缺点,那就是在任务执行完成之后无法获取返回结果,那如果我们想要获取返回结果该如何实现呢?还记上一篇Executor框架结构中提到的Callable接口和Future接口吗?,是的,从JAVA SE 5.0开始引入了Callable和Future,通过它们构建的线程,在任务执行完成后就可以获取执行结果,今天我们就来聊聊线程创建的第三种方式,那就是实现Callable接口。

1.Callable<V>接口
我们先回顾一下java.lang.Runnable接口,就声明了run(),其返回值为void,当然就无法获取结果了。
  1. public interface Runnable {
  2.     public abstract void run();
  3. }

而Callable的接口定义如下

  1. public interface Callable<V> {
  2.       V   call()   throws Exception;
  3. }

该接口声明了一个名称为call()的方法,同时这个方法可以有返回值V,也可以抛出异常。嗯,对该接口我们先了解这么多就行,下面我们来说明如何使用,前篇文章我们说过,无论是Runnable接口的实现类还是Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行,ThreadPoolExecutor或ScheduledThreadPoolExecutor都实现了ExcutorService接口,而因此Callable需要和Executor框架中的ExcutorService结合使用,我们先看看ExecutorService提供的方法:

  1. <T> Future<T> submit(Callable<T> task);
  2. <T> Future<T> submit(Runnable task, T result);
  3. Future<?> submit(Runnable task);
第一个方法:submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。
第二个方法:submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。
第三个方法:submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。
因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。还有点要注意的是,除了我们自己实现Callable对象外,我们还可以使用工厂类Executors来把一个Runnable对象包装成Callable对象。Executors工厂类提供的方法如下:
  1. public static Callable<Object> callable(Runnable task)
  2. public static <T> Callable<T> callable(Runnable task, T result)
2.Future<V>接口
Future<V>接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:
  1. public interface Future<V> {
  2.     boolean cancel(boolean mayInterruptIfRunning);
  3.     boolean isCancelled();
  4.     boolean isDone();
  5.     V get() throws InterruptedException, ExecutionException;
  6.     V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  7. }
方法解析:
V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。
boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。
boolean isCanceller() :如果任务完成前被取消,则返回true。
boolean cancel(boolean mayInterruptRunning) :如果任务还没开始,执行cancel(…)方法将返回false;如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;当任务已经完成,执行cancel(…)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。
通过方法分析我们也知道实际上Future提供了3种功能:(1)能够中断执行中的任务(2)判断任务是否执行完成(3)获取任务执行完成后额结果。
但是我们必须明白Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。
3.FutureTask类
我们先来看看FutureTask的实现
  1. public class FutureTask<V> implements RunnableFuture<V> {

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2.     void run();
  3. }
分析:FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。接下来我们根据FutureTask.run()的执行时机来分析其所处的3种状态:
(1)未启动,FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态,当创建一个FutureTask,而且没有执行FutureTask.run()方法前,这个FutureTask也处于未启动状态。
(2)已启动,FutureTask.run()被执行的过程中,FutureTask处于已启动状态。
(3)已完成,FutureTask.run()方法执行完正常结束,或者被取消或者抛出异常而结束,FutureTask都处于完成状态。


下面我们再来看看FutureTask的方法执行示意图(方法和Future接口基本是一样的,这里就不过多描述了)

分析:
(1)当FutureTask处于未启动或已启动状态时,如果此时我们执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
(2)当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行。
当FutureTask处于已启动状态时,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功,cancel(…)返回true;但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(…)返回false。
当任务已经完成,执行cancel(…)方法将返回false。
最后我们给出FutureTask的两种构造函数:
  1. public FutureTask(Callable<V> callable) {
  2. }
  3. public FutureTask(Runnable runnable, V result) {
  4. }
3.Callable<V>/Future<V>/FutureTask的使用
通过上面的介绍,我们对Callable,Future,FutureTask都有了比较清晰的了解了,那么它们到底有什么用呢?我们前面说过通过这样的方式去创建线程的话,最大的好处就是能够返回结果,加入有这样的场景,我们现在需要计算一个数据,而这个数据的计算比较耗时,而我们后面的程序也要用到这个数据结果,那么这个时Callable岂不是最好的选择?我们可以开设一个线程去执行计算,而主线程继续做其他事,而后面需要使用到这个数据时,我们再使用Future获取不就可以了吗?下面我们就来编写一个这样的实例
3.1 使用Callable+Future获取执行结果
Callable实现类如下:
  1. package com.zejian.Executor;
  2. import java.util.concurrent.Callable;
  3. public class CallableDemo implements Callable<Integer> {
  4.     private int sum;
  5.     @Override
  6.     public Integer call() throws Exception {
  7.         System.out.println(“Callable子线程开始计算啦!”);
  8.         Thread.sleep(2000);
  9.         for(int i=0 ;i<5000;i++){
  10.             sum=sum+i;
  11.         }
  12.         System.out.println(“Callable子线程计算结束!”);
  13.         return sum;
  14.     }
  15. }

Callable执行测试类如下:

  1. package com.zejian.Executor;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Future;
  5. public class CallableTest {
  6.     public static void main(String[] args) {
  7.         //创建线程池
  8.         ExecutorService es = Executors.newSingleThreadExecutor();
  9.         //创建Callable对象任务
  10.         CallableDemo calTask=new CallableDemo();
  11.         //提交任务并获取执行结果
  12.         Future<Integer> future =es.submit(calTask);
  13.         //关闭线程池
  14.         es.shutdown();
  15.         try {
  16.             Thread.sleep(2000);
  17.         System.out.println(“主线程在执行其他任务”);
  18.         if(future.get()!=null){
  19.             //输出获取到的结果
  20.             System.out.println(“future.get()–>”+future.get());
  21.         }else{
  22.             //输出获取到的结果
  23.             System.out.println(“future.get()未获取到结果”);
  24.         }
  25.         } catch (Exception e) {
  26.             e.printStackTrace();
  27.         }
  28.         System.out.println(“主线程在执行完成”);
  29.     }
  30. }
执行结果:

Callable子线程开始计算啦!
主线程在执行其他任务
Callable子线程计算结束!
future.get()–>12497500
主线程在执行完成
3.2 使用Callable+FutureTask获取执行结果
  1. package com.zejian.Executor;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Future;
  5. import java.util.concurrent.FutureTask;
  6. public class CallableTest {
  7.     public static void main(String[] args) {
  8. //      //创建线程池
  9. //      ExecutorService es = Executors.newSingleThreadExecutor();
  10. //      //创建Callable对象任务
  11. //      CallableDemo calTask=new CallableDemo();
  12. //      //提交任务并获取执行结果
  13. //      Future<Integer> future =es.submit(calTask);
  14. //      //关闭线程池
  15. //      es.shutdown();
  16.         //创建线程池
  17.         ExecutorService es = Executors.newSingleThreadExecutor();
  18.         //创建Callable对象任务
  19.         CallableDemo calTask=new CallableDemo();
  20.         //创建FutureTask
  21.         FutureTask<Integer> futureTask=new FutureTask<>(calTask);
  22.         //执行任务
  23.         es.submit(futureTask);
  24.         //关闭线程池
  25.         es.shutdown();
  26.         try {
  27.             Thread.sleep(2000);
  28.         System.out.println(“主线程在执行其他任务”);
  29.         if(futureTask.get()!=null){
  30.             //输出获取到的结果
  31.             System.out.println(“futureTask.get()–>”+futureTask.get());
  32.         }else{
  33.             //输出获取到的结果
  34.             System.out.println(“futureTask.get()未获取到结果”);
  35.         }
  36.         } catch (Exception e) {
  37.             e.printStackTrace();
  38.         }
  39.         System.out.println(“主线程在执行完成”);
  40.     }
  41. }
执行结果:
Callable子线程开始计算啦!
主线程在执行其他任务
Callable子线程计算结束!
futureTask.get()–>12497500
主线程在执行完成
主要参考资料:
java并发编程的艺术

java/android线程池-Executor框架之ThreadPoolExcutor/ScheduledThreadPoolExecutor浅析(多线程编程之三)

无论是在java还是在android中其实使用到的线程池都基本是一样的,因此本篇我们将来认识一下线程池Executor框架(相关知识点结合了并发编程艺术书以及Android开发艺术探索而总结),下面是本篇的主要知识点:

1.Executor框架浅析
首先我们得明白一个 问题,为什么需要线程池?在java中,使用线程来执行异步任务时,线程的创建和销毁需要一定的开销,如果我们为每一个任务创建一个新的线程来执行的话,那么这些线程的创建与销毁将消耗大量的计算资源。同时为每一个任务创建一个新线程来执行,这样的方式可能会使处于高负荷状态的应用最终崩溃。所以线程池的出现为解决这个问题带来曙光。我们将在线程池中创建若干条线程,当有任务需要执行时就从该线程池中获取一条线程来执行任务,如果一时间任务过多,超出线程池的线程数量,那么后面的线程任务就进入一个等待队列进行等待,直到线程池有线程处于空闲时才从等待队列获取要执行的任务进行处理,以此循环…..这样就大大减少了线程创建和销毁的开销,也会缓解我们的应用处于超负荷时的情况。
1.1Executor框架的两级调度模型
在java线程启动时会创建一个本地操作系统线程,当该java线程终止时,这个操作系统线程也会被回收。而每一个java线程都会被一对一映射为本地操作系统的线程,操作系统会调度所有的线程并将它们分别给可用的CPU。而所谓的映射方式是这样实现的,在上层,java多线程程序通过把应用分为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这样种两级调度模型如下图所示:

从图中我们可以看出,应用程序通过Executor框架控制上层的调度,而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
1.2 Executor框架的结构
Executor框架的结构主要包括3个部分
1.任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口
2.任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的EexcutorService接口。Exrcutor有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
3.异步计算的结果:包括接口Future和实现Future接口的FutureTask类(这个我们放在下一篇文章说明)
下面我们通过一个UML图来认识一下这些类间的关系:

Extecutor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor执行。区别就是Runnable无法返回执行结果,而Callable可以返回执行结果。
下面我们通过一张图来理解它们间的执行关系:

分析说明:
主线程首先创建实现Runnable或Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装为一个Callable对象,使用如下两种方式:
Executors.callable(Runnable task)或者Executors.callable(Runnable task,Object resule)。
然后可以把Runnable对象直接提交给ExecutorService执行,方法为ExecutorService.execute(Runnable command);或者也可以把Runnable对象或者Callable对象提交给ExecutorService执行,方法为ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable<T> task)。这里需要注意的是如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(其实就是FutureTask)。当然由于FutureTask实现了Runnable接口,我们也可以直接创建FutureTask,然后提交给ExecutorService执行。到此Executor框架的主要体系结构我们都介绍完了,我们对此有了大概了解后,下面我们就重点聊聊两个主要的线程池实现类。
2.ThreadPoolExecutor浅析
ThreadPoolExecutor是线程的真正实现,通常使用工厂类Executors来创建,但它的构造方法提供了一系列参数来配置线程池,下面我们就先介绍ThreadPoolExecutor的构造方法中各个参数的含义。
  1. public ThreadPoolExecutor(int corePoolSize,
  2.                               int maximumPoolSize,
  3.                               long keepAliveTime,
  4.                               TimeUnit unit,
  5.                               BlockingQueue<Runnable> workQueue,
  6.                               ThreadFactory threadFactory) {
  7.         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8.              threadFactory, defaultHandler);
  9.     }
corePoolSize:线程池的核心线程数,默认情况下,核心线程数会一直在线程池中存活,即使它们处理闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么闲置的核心线程在等待新任务到来时会执行超时策略,这个时间间隔由keepAliveTime所指定,当等待时间超过keepAliveTime所指定的时长后,核心线程就会被终止。
maximumPoolSize:线程池所能容纳的最大线程数量,当活动线程数到达这个数值后,后续的新任务将会被阻塞。
keepAliveTime:非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true时,keepAliveTime同样会作用于核心线程。
unit:用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。
workQueue:线程池中的任务队列,通过线程池的execute方法提交Runnable对象会存储在这个队列中。
threadFactory:线程工厂,为线程池提供创建新线程的功能。ThreadFactory是一个接口,它只有一个方法:Thread newThread(Runnable r)。
除了上面的参数外还有个不常用的参数,RejectExecutionHandler,这个参数表示当ThreadPoolExecutor已经关闭或者ThreadPoolExecutor已经饱和时(达到了最大线程池大小而且工作队列已经满),execute方法将会调用Handler的rejectExecution方法来通知调用者,默认情况 下是抛出一个RejectExecutionException异常。了解完相关构造函数的参数,我们再来看看ThreadPoolExecutor执行任务时的大致规则:
(1)如果线程池的数量还未达到核心线程的数量,那么会直接启动一个核心线程来执行任务
(2)如果线程池中的线程数量已经达到或者超出核心线程的数量,那么任务会被插入到任务队列中排队等待执行。
(3)如果在步骤(2)中无法将任务插入到任务队列中,这往往是由于任务队列已满,这个时候如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务。
(4)如果在步骤(3)中线程数量已经达到线程池规定的最大值,那么就会拒绝执行此任务,ThreadPoolExecutor会调用RejectExecutionHandler的rejectExecution方法来通知调用者。
到此ThreadPoolExecutor的详细配置了解完了,ThreadPoolExecutor的执行规则也了解完了,那么接下来我们就来介绍3种常见的线程池,它们都直接或者间接地通过配置ThreadPoolExecutor来实现自己的功能特性,这个3种线程池分别是FixedThreadPool,CachedThreadPool,ScheduledThreadPool以及SingleThreadExecutor。
2.1FixedThreadPool
 FixedThreadPool模式会使用一个优先固定数目的线程来处理若干数目的任务。规定数目的线程处理所有任务,一旦有线程处理完了任务就会被用来处理新的任务(如果有的话)。FixedThreadPool模式下最多的线程数目是一定的。创建FixedThreadPool对象代码如下:
  1. ExecutorService fixedThreadPool=Executors.newFixedThreadPool(5);

我们来看看FixedThreadPool创建方法源码:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2.         return new ThreadPoolExecutor(nThreads, nThreads,
  3.                                       0L, TimeUnit.MILLISECONDS,
  4.                                       new LinkedBlockingQueue<Runnable>());
  5.     }

FixedThreadPool的corePoolSize和maximumPoolSize参数都被设置为nThreads。当线程池中的线程数量大于corePoolSize时,keepAliveTime为非核心空闲线程等待新任务的最长时间,超过这个时间后非核心线程将被终止,这里keepAliveTime设置为0L,就说明非核心线程会立即被终止。事实上这里也没有非核心线程创建,因为核心线程数和最大线程数都一样的。下面我们来看看FixedThreadPool的execute()方法的运行流程:

分析:
(1)如果当前运行线程数少corePoolSize,则创建一个新的线程来执行任务。
(2)如果当前线程池的运行线程数等于corePoolSize,那么后面提交的任务将加入LinkedBlockingQueue。
(3)线程在执行完图中的1后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
这里还有点要说明的是FixedThreadPool使用的是无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)。使用该队列作为工作队列会对线程池产生如下影响
(1)当前线程池中的线程数量达到corePoolSize后,新的任务将在无界队列中等待。
(2)由于我们使用的是无界队列,所以参数maximumPoolSize和keepAliveTime无效。
(3)由于使用无界队列,运行中的FixedThreadPool不会拒绝任务(当然此时是未执行shutdown和shutdownNow方法),所以不会去调用RejectExecutionHandler的rejectExecution方法抛出异常。
下面我们给出案例,该案例来自java编程思想一书:
  1. public class LiftOff implements Runnable{
  2.     protected int countDown = 10//Default   
  3.     private static int taskCount = 0;
  4.     private final int id = taskCount++;
  5.     public LiftOff() {}
  6.     public LiftOff(int countDown) {
  7.         this.countDown = countDown;
  8.     }
  9.     public String status() {
  10.         return “#” + id + “(“ +
  11.             (countDown > 0 ? countDown : “LiftOff!”) + “) “;
  12.     }
  13.     @Override
  14.     public void run() {
  15.         while(countDown– > 0) {
  16.             System.out.print(status());
  17.             Thread.yield();
  18.         }
  19.     }
  20. }

声明一个Runnable对象,使用FixedThreadPool执行任务如下:

  1. public class FixedThreadPool {
  2.     public static void main(String[] args) {
  3.         //三个线程来执行五个任务   
  4.         ExecutorService exec = Executors.newFixedThreadPool(3);
  5.         for(int i = 0; i < 5; i++) {
  6.             exec.execute(new LiftOff());
  7.         }
  8.         exec.shutdown();
  9.     }
  10. }
2.2 CachedThreadPool
CachedThreadPool首先会按照需要创建足够多的线程来执行任务(Task)。随着程序执行的过程,有的线程执行完了任务,可以被重新循环使用时,才不再创建新的线程来执行任务。创建方式:
  1. ExecutorService cachedThreadPool=Executors.newCachedThreadPool();
  1. public static ExecutorService newCachedThreadPool() {
  2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3.                                       60L, TimeUnit.SECONDS,
  4.                                       new SynchronousQueue<Runnable>());
  5.     }

从该静态方法,我们可以看到CachedThreadPool的corePoolSize被设置为0,而maximumPoolSize被设置Integer.MAX_VALUE,即maximumPoolSize是无界的,而keepAliveTime被设置为60L,单位为妙。也就是空闲线程等待时间最长为60秒,超过该时间将会被终止。而且在这里CachedThreadPool使用的是没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPoolSize是无界的,也就是意味着如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度时CachedThreadPool将会不断的创建新的线程,在极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。CachedThreadPool的execute()方法的运行流程:

分析:
(1)首先执行SynchronousQueue.offer(Runnable task),添加一个任务。如果当前CachedThreadPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),其中NANOSECONDS是毫微秒即十亿分之一秒(就是微秒/1000),那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则进入第(2)步。
(2)当CachedThreadPool初始线程数为空时,或者当前没有空闲线程,将没有线程去执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这样的情况下,步骤(1)将会失败,此时CachedThreadPool会创建一个新的线程来执行任务,execute()方法执行完成。
(3)在步骤(2)中创建的新线程将任务执行完成后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒,如果60秒内主线程提交了一个新任务,那么这个空闲线程将会执行主线程提交的新任务,否则,这个空闲线程将被终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的 CachedThreadPool是不会使用任何资源的。
根据前面的分析我们知道SynchronousQueue是一个没有容量的阻塞队列(其实个人认为是相对应时间而已的没有容量,因为时间到空闲线程就会被移除)。每个插入操作必须等到一个线程与之对应。CachedThreadPool使用SynchronousQueue,把主线程的任务传递给空闲线程执行。流程如下:


CachedThreadPool使用的案例代码如下:

  1. public class CachedThreadPool {
  2.     public static void main(String[] args) {
  3.         ExecutorService exec = Executors.newCachedThreadPool();
  4.         for(int i = 0; i < 10; i++) {
  5.             exec.execute(new LiftOff());
  6.         }
  7.         exec.shutdown();
  8.     }
  9. }
2.3 SingleThreadExecutor

SingleThreadExecutor模式只会创建一个线程。它和FixedThreadPool比较类似,不过线程数是一个。如果多个任务被提交给SingleThreadExecutor的话,那么这些任务会被保存在一个队列中,并且会按照任务提交的顺序,一个先执行完成再执行另外一个线程。SingleThreadExecutor模式可以保证只有一个任务会被执行。这种特点可以被用来处理共享资源的问题而不需要考虑同步的问题。

创建方式:
  1. ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();
  1. public static ExecutorService newSingleThreadExecutor() {
  2.         return new FinalizableDelegatedExecutorService
  3.             (new ThreadPoolExecutor(11,
  4.                                     0L, TimeUnit.MILLISECONDS,
  5.                                     new LinkedBlockingQueue<Runnable>()));
  6.     }

从静态方法可以看出SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数则与FixedThreadPool相同。SingleThreadExecutor使用的工作队列也是无界队列LinkedBlockingQueue。由于SingleThreadExecutor采用无界队列的对线程池的影响与FixedThreadPool一样,这里就不过多描述了。同样的我们先来看看其运行流程:

分析:
(1)如果当前线程数少于corePoolSize即线程池中没有线程运行,则创建一个新的线程来执行任务。
(2)在线程池的线程数量等于corePoolSize时,将任务加入到LinkedBlockingQueue。
(3)线程执行完成(1)中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
SingleThreadExecutor使用的案例代码如下:
  1. public class SingleThreadExecutor {
  2.     public static void main(String[] args) {
  3.         ExecutorService exec = Executors.newSingleThreadExecutor();
  4.         for (int i = 0; i < 2; i++) {
  5.             exec.execute(new LiftOff());
  6.         }
  7.     }
  8. }
2.4 各自的适用场景
FixedThreadPool:适用于为了满足资源管理需求,而需要限制当前线程的数量的应用场景,它适用于负载比较重的服务器。
SingleThreadExecutor:适用于需要保证执行顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的场景。
CachedThreadPool:大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。
3.ScheduledThreadPoolExecutor浅析
3.1 ScheduledThreadPoolExecutor执行机制分析
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但比Timer更强大,更灵活,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。接下来我们先来了解一下ScheduledThreadPoolExecutor的运行机制:

分析:DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中无意义。ScheduledThreadPoolExecutor的执行主要分为以下两个部分
(1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduleFutureTask。
(2)线程池中的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务。
3.2 如何创建ScheduledThreadPoolExecutor?
ScheduledThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建两种类型的ScheduledThreadPoolExecutor,如下:
(1)ScheduledThreadPoolExecutor:可以执行并行任务也就是多条线程同时执行。
(2)SingleThreadScheduledExecutor:可以执行单条线程。
创建ScheduledThreadPoolExecutor的方法构造如下:
  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  2. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

创建SingleThreadScheduledExecutor的方法构造如下:

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor()
  2. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

创建实例对象代码如下:

  1. ScheduledExecutorService scheduledThreadPoolExecutor=Executors.newScheduledThreadPool(5);
  1. ScheduledExecutorService singleThreadScheduledExecutor=Executors.newSingleThreadScheduledExecutor();
3.3 ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor的适用场景
ScheduledThreadPoolExecutor:适用于多个后台线程执行周期性任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。
SingleThreadScheduledExecutor:适用于需要单个后台线程执行周期任务,同时需要保证任务顺序执行的应用场景。
3.4 ScheduledThreadPoolExecutor使用案例
我们创建一个Runnable的对象,然后使用ScheduledThreadPoolExecutor的Scheduled()来执行延迟任务,输出执行时间即可:
我们先来介绍一下该类延迟执行的方法:
  1. public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
参数解析:
command:就是一个实现Runnable接口的类
delay:延迟多久后执行。
unit:用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。
这里要注意这个方法会返回ScheduledFuture实例,可以用于获取线程状态信息和延迟时间。
  1. package com.zejian.Executor;
  2. import java.text.ParsePosition;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. /**
  6.  * @author zejian
  7.  * @time 2016年3月14日 下午9:10:41
  8.  * @decrition 创建一个工作线程继承Runnable
  9.  */
  10. public class WorkerThread implements Runnable{
  11.     @Override
  12.     public void run() {
  13.          System.out.println(Thread.currentThread().getName()+” Start. Time = “+getNowDate());
  14.          threadSleep();
  15.          System.out.println(Thread.currentThread().getName()+” End. Time = “+getNowDate());
  16.     }
  17.     /**
  18.      * 睡3秒
  19.      */
  20.     public void threadSleep(){
  21.         try {
  22.             Thread.sleep(3000);
  23.         } catch (InterruptedException e) {
  24.             // TODO Auto-generated catch block
  25.             e.printStackTrace();
  26.         }
  27.     }
  28.      /**
  29.       * 获取现在时间
  30.       * 
  31.       * @return 返回时间类型 yyyy-MM-dd HH:mm:ss
  32.       */
  33.     public static String getNowDate() {
  34.           Date currentTime = new Date();
  35.           SimpleDateFormat formatter;
  36.             formatter = new SimpleDateFormat (“yyyy-MM-dd HH:mm:ss”);
  37.             String ctime = formatter.format(currentTime);
  38.           return ctime;
  39.          }
  40. }

执行类如下:

  1. package com.zejian.Executor;
  2. import java.text.ParsePosition;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.ScheduledExecutorService;
  7. import java.util.concurrent.TimeUnit;
  8. public class ScheduledThreadPoolTest {
  9.     public static void main(String[] args) {
  10.         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  11.          try {
  12.         //schedule to run after sometime
  13.         System.out.println(“Current Time = “+getNowDate());
  14.         for(int i=0; i<3; i++){
  15.             Thread.sleep(1000);
  16.             WorkerThread worker = new WorkerThread();
  17.             //延迟10秒后执行
  18.             scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);
  19.         }
  20.             Thread.sleep(3000);
  21.         } catch (InterruptedException e) {
  22.             e.printStackTrace();
  23.         }
  24.         scheduledThreadPool.shutdown();
  25.         while(!scheduledThreadPool.isTerminated()){
  26.             //wait for all tasks to finish
  27.         }
  28.         System.out.println(“Finished all threads”);
  29.     }
  30.      /**
  31.       * 获取现在时间
  32.       * 
  33.       * @return 返回时间类型 yyyy-MM-dd HH:mm:ss
  34.       */
  35.      public static String getNowDate() {
  36.       Date currentTime = new Date();
  37.       SimpleDateFormat formatter;
  38.         formatter = new SimpleDateFormat (“yyyy-MM-dd HH:mm:ss”);
  39.         String ctime = formatter.format(currentTime);
  40.       return ctime;
  41.      }
  42. }

运行输入执行结果:


线程任务确实在10秒延迟后才开始执行。这就是schedule()方法的使用。下面我们再介绍2个可用于周期性执行任务的方法。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

scheduleAtFixedRate方法的作用是预定在初始的延迟结束后,周期性地执行给定的任务,周期长度为period,其中initialDelay为初始延迟。

(按照固定的时间来执行,即:到点执行)

  1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
scheduleWithFixedDelay方法的作用是预定在初始的延迟结束后周期性地执行给定任务,在一次调用完成和下一次调用开始之间有长度为delay的延迟,其中initialDelay为初始延迟(简单说是是等上一个任务结束后,在等固定的时间,然后执行。即:执行完上一个任务后再执行)。
下面给出实现案例代码参考:
  1. package com.zejian.Executor;
  2. import java.util.Date;
  3. import java.util.concurrent.ScheduledThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ScheduledTask {
  6.     public ScheduledThreadPoolExecutor se = new ScheduledThreadPoolExecutor(5);
  7.     public static void main(String[] args) {
  8.         new ScheduledTask();
  9.     }
  10.     public void fixedPeriodSchedule() {
  11.         // 设定可以循环执行的runnable,初始延迟为0,这里设置的任务的间隔为5秒
  12.         for(int i=0;i<5;i++){
  13.             se.scheduleAtFixedRate(new FixedSchedule(), 05, TimeUnit.SECONDS);
  14.         }
  15.     }
  16.     public ScheduledTask() {
  17.         fixedPeriodSchedule();
  18.     }
  19.     class FixedSchedule implements Runnable {
  20.         public void run() {
  21.             System.out.println(“当前线程:”+Thread.currentThread().getName()+”  当前时间:”+new Date(System.currentTimeMillis()));
  22.         }
  23.     }
  24. }
运行结果(后来补贴的结果,所以时间是2017)
  1. 当前线程:pool-1-thread-5  当前时间:Tue Aug 08 09:43:18 CST 2017
  2. 当前线程:pool-1-thread-4  当前时间:Tue Aug 08 09:43:18 CST 2017
  3. 当前线程:pool-1-thread-3  当前时间:Tue Aug 08 09:43:18 CST 2017
  4. 当前线程:pool-1-thread-1  当前时间:Tue Aug 08 09:43:18 CST 2017
  5. 当前线程:pool-1-thread-2  当前时间:Tue Aug 08 09:43:18 CST 2017
  6. 当前线程:pool-1-thread-1  当前时间:Tue Aug 08 09:43:23 CST 2017
  7. 当前线程:pool-1-thread-4  当前时间:Tue Aug 08 09:43:23 CST 2017
  8. 当前线程:pool-1-thread-3  当前时间:Tue Aug 08 09:43:23 CST 2017
  9. 当前线程:pool-1-thread-5  当前时间:Tue Aug 08 09:43:23 CST 2017
  10. 当前线程:pool-1-thread-2  当前时间:Tue Aug 08 09:43:23 CST 2017
  11. 当前线程:pool-1-thread-1  当前时间:Tue Aug 08 09:43:28 CST 2017
  12. 当前线程:pool-1-thread-4  当前时间:Tue Aug 08 09:43:28 CST 2017
  13. 当前线程:pool-1-thread-5  当前时间:Tue Aug 08 09:43:28 CST 2017
  14. 当前线程:pool-1-thread-3  当前时间:Tue Aug 08 09:43:28 CST 2017
  15. 当前线程:pool-1-thread-1  当前时间:Tue Aug 08 09:43:28 CST 2017
至于scheduleWithFixedDelay方法,大家就把代码稍微修改一下执行试试就行,这里就不重复了。而SingleThreadScheduledExecutor的使用的方法基本是类似,只不过是单线程罢了,这里也不再描述了。好了,今天就到这吧。

主要参考书籍:

java核心技术卷1

android开发艺术探索

java并发编程的艺术

java多线程同步以及线程间通信详解&消费者生产者模式&死锁&Thread.join()(多线程编程之二)

本篇我们将讨论以下知识点:

1.线程同步问题的产生

什么是线程同步问题,我们先来看一段卖票系统的代码,然后再分析这个问题:
  1. public class Ticket implements Runnable
  2. {
  3.     //当前拥有的票数
  4.     private  int num = 100;
  5.     public void run()
  6.     {
  7.         while(true)
  8.         {
  9.                 if(num>0)
  10.                 {
  11.                     try{Thread.sleep(10);}catch (InterruptedException e){}
  12.                     //输出卖票信息
  13.                     System.out.println(Thread.currentThread().getName()+“…..sale….”+num–);
  14.                 }
  15.         }
  16.     }
  17. }

上面是卖票线程类,下来再来看看执行类:

  1. public class TicketDemo {
  2.     public static void main(String[] args)
  3.     {
  4.         Ticket t = new Ticket();//创建一个线程任务对象。
  5.         //创建4个线程同时卖票
  6.         Thread t1 = new Thread(t);
  7.         Thread t2 = new Thread(t);
  8.         Thread t3 = new Thread(t);
  9.         Thread t4 = new Thread(t);
  10.         //启动线程
  11.         t1.start();
  12.         t2.start();
  13.         t3.start();
  14.         t4.start();
  15.     }
  16. }

运行程序结果如下(仅截取部分数据):

从运行结果,我们就可以看出我们4个售票窗口同时卖出了1号票,这显然是不合逻辑的,其实这个问题就是我们前面所说的线程同步问题。不同的线程都对同一个数据进了操作这就容易导致数据错乱的问题,也就是线程不同步。那么这个问题该怎么解决呢?在给出解决思路之前我们先来分析一下这个问题是怎么产生的?我们声明一个线程类Ticket,在这个类中我们又声明了一个成员变量num也就是票的数量,然后我们通过run方法不断的去获取票数并输出,最后我们在外部类TicketDemo中创建了四个线程同时操作这个数据,运行后就出现我们刚才所说的线程同步问题,从这里我们可以看出产生线程同步(线程安全)问题的条件有两个:1.多个线程在操作共享的数据(num),2.操作共享数据的线程代码有多条(4条线程);既然原因知道了,那该怎么解决?
解决思路:将多条操作共享数据的线程代码封装起来,当有线程在执行这些代码的时候,其他线程时不可以参与运算的。必须要当前线程把这些代码都执行完毕后,其他线程才可以参与运算。 好了,思路知道了,我们就用java代码的方式来解决这个问题。
2.解决线程同步的两种典型方案
在java中有两种机制可以防止线程安全的发生,Java语言提供了一个synchronized关键字来解决这问题,同时在Java SE5.0引入了Lock锁对象的相关类,接下来我们分别介绍这两种方法
2.1通过锁(Lock)对象的方式解决线程安全问题
在给出解决代码前我们先来介绍一个知识点:Lock,锁对象。在java中锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但有的锁可以允许多个线程并发访问共享资源,比如读写锁,后面我们会分析)。在Lock接口出现之前,java程序是靠synchronized关键字(后面分析)实现锁功能的,而JAVA SE5.0之后并发包中新增了Lock接口用来实现锁的功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁,缺点就是缺少像synchronized那样隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。接下来我们就来介绍Lock接口的主要API方便我们学习
方法 相关描述内容
void lock() 获取锁,调用该方法当前线程会获取锁,当获取锁后。从该方法返回
void lockInterruptibly()
throws InterruptedException
可中断获取锁和lock()方法不同的是该方法会响应中断,即在获取锁
中可以中断当前线程。例如某个线程在等待一个锁的控制权的这段时
间需要中断。
boolean tryLock() 尝试非阻塞获取锁,调用该方法后立即返回,如果能够获取锁则返回
true,否则返回false。
boolean tryLock(long time,TimeUnit unit)
throws  InterruptedException
超时获取锁,当前线程在以下3种情况返回:
1.当前线程在超时时间内获取了锁
2.当前线程在超时时间被中断
3.当前线程超时时间结束,返回false
void unlock() 释放锁
Condition newCondition() 条件对象,获取等待通知组件。该组件和当前的锁绑定,当前线程只有
获取了锁,才能调用该组件的await()方法,而调用后,当前线程将缩放
锁。
这里先介绍一下API,后面我们将结合Lock接口的实现子类ReentrantLock使用某些方法。
ReentrantLock(重入锁):
重入锁,顾名思义就是支持重新进入的锁,它表示该锁能够支持一个线程对资源的重复加锁,也就是说在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞,同时还支持获取锁的公平性和非公平性。这里的公平是在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平锁,反之,是不公平的。那么该如何使用呢?看范例代码:
1.同步执行的代码跟synchronized类似功能:
  1. ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁  
  2. ReentrantLock lock = new ReentrantLock(true); //公平锁  
  3. lock.lock(); //如果被其它资源锁定,会在此等待锁释放,达到暂停的效果  
  4. try {
  5.     //操作  
  6. finally {
  7.     lock.unlock();  //释放锁
  8. }

2.防止重复执行代码:

  1. ReentrantLock lock = new ReentrantLock();
  2. if (lock.tryLock()) {  //如果已经被lock,则立即返回false不会等待,达到忽略操作的效果   
  3.     try {
  4.         //操作  
  5.     } finally {
  6.         lock.unlock();
  7.    }
  8. }

3.尝试等待执行的代码:

  1. ReentrantLock lock = new ReentrantLock(true); //公平锁  
  2. try {
  3.     if (lock.tryLock(5, TimeUnit.SECONDS)) {
  4.         //如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行  
  5.        try {
  6.             //操作  
  7.         } finally {
  8.             lock.unlock();
  9.         }
  10.     }
  11. catch (InterruptedException e) {
  12.     e.printStackTrace(); //当前线程被中断时(interrupt),会抛InterruptedException                   
  13. }

这里有点需要特别注意的,把解锁操作放在finally代码块内这个十分重要。如果在临界区的代码抛出异常,锁必须被释放。否则,其他线程将永远阻塞。好了,ReentrantLock我们就简单介绍到这里,接下来我们通过ReentrantLock来解决前面卖票线程的线程同步(安全)问题,代码如下:

  1. import java.util.concurrent.locks.Lock;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. public class Ticket implements Runnable {
  4.     //创建锁对象
  5.     private Lock ticketLock = new ReentrantLock();
  6.     //当前拥有的票数
  7.     private int num = 100;
  8.     public void run() {
  9.         while (true) {
  10.             try {
  11.                 ticketLock.lock();//获取锁
  12.                 if (num > 0) {
  13.                     Thread.sleep(10);//输出卖票信息System.out.println(Thread.currentThread().getName()+”…..sale….”+num–); }
  14.                 } else {
  15.                     break;
  16.                 }
  17.             } catch (InterruptedException e) {
  18.                 Thread.currentThread().interrupt();//出现异常就中断
  19.             } finally {
  20.                 ticketLock.unlock();//释放锁
  21.             }
  22.         }
  23.     }
  24. }
TicketDemo类无需变化,运行结果正常(太多不贴了),线程安全问题就此解决。
2.2通过synchronied关键字的方式解决线程安全问题
在Java中内置了语言级的同步原语-synchronized,这个可以大大简化了Java中多线程同步的使用。从JAVA SE1.0开始,java中的每一个对象都有一个内部锁,如果一个方法使用synchronized关键字进行声明,那么这个对象将保护整个方法,也就是说调用该方法线程必须获得内部的对象锁。
  1. public synchronized void method{
  2.   //method body
  3. }

等价于

  1. private Lock ticketLock = new ReentrantLock();
  2. public void method{
  3.  ticketLock.lock();
  4.  try{
  5.   //…….
  6.  }finally{
  7.    ticketLock.unlock();
  8.  }
  9. }

从这里可以看出使用synchronized关键字来编写代码要简洁得多了。当然,要理解这一代码,我们必须知道每个对象有一个内部锁,并且该锁有一个内部条件。由锁来管理那些试图进入synchronized方法的线程,由条件来管那些调用wait的线程(wait()/notifyAll/notify())。同时我们必须明白一旦有一个线程通过synchronied方法获取到内部锁,该类的所有synchronied方法或者代码块都无法被其他线程访问直到当前线程释放了内部锁。刚才上面说的是同步方法,synchronized还有一种同步代码块的实现方式:

  1. Object obj = new Object();
  2. synchronized(obj){
  3.   //需要同步的代码
  4. }

其中obj是对象锁,可以是任意对象。那么我们就通过其中的一个方法来解决售票系统的线程同步问题:

  1. class Ticket implements Runnable
  2. {
  3.     private  int num = 100;
  4.     Object obj = new Object();
  5.     public void run()
  6.     {
  7.         while(true)
  8.         {
  9.             synchronized(obj)
  10.             {
  11.                 if(num>0)
  12.                 {
  13.                     try{Thread.sleep(10);}catch (InterruptedException e){}
  14.                     System.out.println(Thread.currentThread().getName()+“…..sale….”+num–);
  15.                 }
  16.             }
  17.         }
  18.     }
  19. }
嗯,同步代码块解决,运行结果也正常。到此同步问题也就解决了,当然代码同步也是要牺牲效率为前提的:
同步的好处:解决了线程的安全问题。
同步的弊端:相对降低了效率,因为同步外的线程的都会判断同步锁。
同步的前提:同步中必须有多个线程并使用同一个锁。
3.线程间的通信机制
线程开始运行,拥有自己的栈空间,但是如果每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者是价值很小,如果多线程能够相互配合完成工作的话,这将带来巨大的价值,这也就是线程间的通信啦。在java中多线程间的通信使用的是等待/通知机制来实现的。
3.1synchronied关键字等待/通知机制:是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述的两个线程通过对象O来完成交互,而对象上的wait()和notify()/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
等待/通知机制主要是用到的函数方法是notify()/notifyAll(),wait()/wait(long),wait(long,int),这些方法在上一篇文章都有说明过,这里就不重复了。当然这是针对synchronied关键字修饰的函数或代码块,因为要使用notify()/notifyAll(),wait()/wait(long),wait(long,int)这些方法的前提是对调用对象加锁,也就是说只能在同步函数或者同步代码块中使用。
3.2条件对象的等待/通知机制:所谓的条件对象也就是配合前面我们分析的Lock锁对象,通过锁对象的条件对象来实现等待/通知机制。那么条件对象是怎么创建的呢?
  1. //创建条件对象
  2. Condition conditionObj=ticketLock.newCondition();
就这样我们创建了一个条件对象。注意这里返回的对象是与该锁(ticketLock)相关的条件对象。下面是条件对象的API:
方法 函数方法对应的描述
void await() 将该线程放到条件等待池中(对应wait()方法)
void signalAll() 解除该条件等待池中所有线程的阻塞状态(对应notifyAll()方法)
void signal() 从该条件的等待池中随机地选择一个线程,解除其阻塞状态(对应notify()方法)
上述方法的过程分析:一个线程A调用了条件对象的await()方法进入等待状态,而另一个线程B调用了条件对象的signal()或者signalAll()方法,线程A收到通知后从条件对象的await()方法返回,进而执行后续操作。上述的两个线程通过条件对象来完成交互,而对象上的await()和signal()/signalAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。当然这样的操作都是必须基于对象锁的,当前线程只有获取了锁,才能调用该条件对象的await()方法,而调用后,当前线程将缩放锁。
这里有点要特别注意的是,上述两种等待/通知机制中,无论是调用了signal()/signalAll()方法还是调用了notify()/notifyAll()方法并不会立即激活一个等待线程。它们仅仅都只是解除等待线程的阻塞状态,以便这些线程可以在当前线程解锁或者退出同步方法后,通过争夺CPU执行权实现对对象的访问。到此,线程通信机制的概念分析完,我们下面通过生产者消费者模式来实现等待/通知机制。
4.生产者消费者模式
4.1单生产者单消费者模式
顾名思义,就是一个线程消费,一个线程生产。我们先来看看等待/通知机制下的生产者消费者模式:我们假设这样一个场景,我们是卖北京烤鸭店铺,我们现在只有一条生产线也只有一条消费线,也就是说只能生产线程生产完了,再通知消费线程才能去卖,如果消费线程没烤鸭了,就必须通知生产线程去生产,此时消费线程进入等待状态。在这样的场景下,我们不仅要保证共享数据(烤鸭数量)的线程安全,而且还要保证烤鸭数量在消费之前必须有烤鸭。下面我们通过java代码来实现:
北京烤鸭生产资源类KaoYaResource:
  1. public class KaoYaResource {
  2.     private String name;
  3.     private int count = 1;//烤鸭的初始数量
  4.     private boolean flag = false;//判断是否有需要线程等待的标志
  5.     /**
  6.      * 生产烤鸭
  7.      */
  8.     public synchronized void product(String name){
  9.         if(flag){
  10.             //此时有烤鸭,等待
  11.             try {
  12.                 this.wait();
  13.             } catch (InterruptedException e) {
  14.                 e.printStackTrace()
  15. ;
  16.             }
  17.         }
  18.         this.name=name+count;//设置烤鸭的名称
  19.         count++;
  20.         System.out.println(Thread.currentThread().getName()+“…生产者…”+this.name);
  21.         flag=true;//有烤鸭后改变标志
  22.         notifyAll();//通知消费线程可以消费了
  23.     }
  24.     /**
  25.      * 消费烤鸭
  26.      */
  27.     public synchronized void consume(){
  28.         if(flag){//如果没有烤鸭就等待
  29.             try{this.wait();}catch(InterruptedException e){}
  30.         }
  31.         System.out.println(Thread.currentThread().getName()+“…消费者……..”+this.name);//消费烤鸭1
  32.         flag = false;
  33.         notifyAll();//通知生产者生产烤鸭
  34.     }
  35. }
在这个类中我们有两个synchronized的同步方法,一个是生产烤鸭的,一个是消费烤鸭的,之所以需要同步是因为我们操作了共享数据count,同时为了保证生产烤鸭后才能消费也就是生产一只烤鸭后才能消费一只烤鸭,我们使用了等待/通知机制,wait()和notify()。当第一次运行生产现场时调用生产的方法,此时有一只烤鸭,即flag=false,无需等待,因此我们设置可消费的烤鸭名称然后改变flag=true,同时通知消费线程可以消费烤鸭了,即使此时生产线程再次抢到执行权,因为flag=true,所以生产线程会进入等待阻塞状态,消费线程被唤醒后就进入消费方法,消费完成后,又改变标志flag=false,通知生产线程可以生产烤鸭了………以此循环。
生产消费执行类Single_Producer_Consumer.java:
  1. public class Single_Producer_Consumer {
  2.     public static void main(String[] args)
  3.     {
  4.         KaoYaResource r = new KaoYaResource();
  5.         Producer pro = new Producer(r);
  6.         Consumer con = new Consumer(r);
  7.         //生产者线程
  8.         Thread t0 = new Thread(pro);
  9.         //消费者线程
  10.         Thread t2 = new Thread(con);
  11.         //启动线程
  12.         t0.start();
  13.         t2.start();
  14.     }
  15. }
  16. class Producer implements Runnable
  17. {
  18.     private KaoYaResource r;
  19.     Producer(KaoYaResource r)
  20.     {
  21.         this.r = r;
  22.     }
  23.     public void run()
  24.     {
  25.         while(true)
  26.         {
  27.             r.product(“北京烤鸭”);
  28.         }
  29.     }
  30. }
  31. class Consumer implements Runnable
  32. {
  33.     private KaoYaResource r;
  34.     Consumer(KaoYaResource r)
  35.     {
  36.         this.r = r;
  37.     }
  38.     public void run()
  39.     {
  40.         while(true)
  41.         {
  42.             r.consume();
  43.         }
  44.     }
  45. }

在这个类中我们创建两个线程,一个是消费者线程,一个是生产者线程,我们分别开启这两个线程用于不断的生产消费,运行结果如下:

很显然的情况就是生产一只烤鸭然后就消费一只烤鸭。运行情况完全正常,嗯,这就是单生产者单消费者模式。上面使用的是synchronized关键字的方式实现的,那么接下来我们使用对象锁的方式实现:KaoYaResourceByLock.java
  1. public class KaoyaResourceByLock {
  2.     private String name;
  3.     private int count = 1;//烤鸭的初始数量
  4.     private boolean flag = false;//判断是否有需要线程等待的标志
  5.     //创建一个锁对象
  6.     private Lock resourceLock=new ReentrantLock();
  7.     //创建条件对象
  8.     private Condition condition= resourceLock.newCondition();
  9.     /**
  10.      * 生产烤鸭
  11.      */
  12.     public  void product(String name){
  13.         resourceLock.lock();//先获取锁
  14.         try{
  15.             if(flag){
  16.                 try {
  17.                     condition.await();
  18.                 } catch (InterruptedException e) {
  19.                     e.printStackTrace();
  20.                 }
  21.             }
  22.             this.name=name+count;//设置烤鸭的名称
  23.             count++;
  24.             System.out.println(Thread.currentThread().getName()+“…生产者…”+this.name);
  25.             flag=true;//有烤鸭后改变标志
  26.             condition.signalAll();//通知消费线程可以消费了
  27.         }finally{
  28.             resourceLock.unlock();
  29.         }
  30.     }
  31.     /**
  32.      * 消费烤鸭
  33.      */
  34.     public  void consume(){
  35.         resourceLock.lock();
  36.         try{
  37.         if(!flag){//如果没有烤鸭就等待
  38.             try{condition.await();}catch(InterruptedException e){}
  39.         }
  40.         System.out.println(Thread.currentThread().getName()+“…消费者……..”+this.name);//消费烤鸭1
  41.         flag = false;
  42.         condition.signalAll();//通知生产者生产烤鸭
  43.         }finally{
  44.             resourceLock.unlock();
  45.         }
  46.     }
  47. }
代码变化不大,我们通过对象锁的方式去实现,首先要创建一个对象锁,我们这里使用的重入锁ReestrantLock类,然后通过手动设置lock()和unlock()的方式去获取锁以及释放锁。为了实现等待/通知机制,我们还必须通过锁对象去创建一个条件对象Condition,然后通过锁对象的await()和signalAll()方法去实现等待以及通知操作。Single_Producer_Consumer.java代码替换一下资源类即可,运行结果就不贴了,有兴趣自行操作即可。
4.2多生产者多消费者模式
分析完了单生产者单消费者模式,我们再来聊聊多生产者多消费者模式,也就是多条生产线程配合多条消费线程。既然这样的话我们先把上面的代码Single_Producer_Consumer.java类修改成新类,大部分代码不变,仅新增2条线程去跑,一条t1的生产  共享资源类KaoYaResource不作更改,代码如下:
  1. public class Mutil_Producer_Consumer {
  2.     public static void main(String[] args)
  3.     {
  4.         KaoYaResource r = new KaoYaResource();
  5.         Mutil_Producer pro = new Mutil_Producer(r);
  6.         Mutil_Consumer con = new Mutil_Consumer(r);
  7.         //生产者线程
  8.         Thread t0 = new Thread(pro);
  9.         Thread t1 = new Thread(pro);
  10.         //消费者线程
  11.         Thread t2 = new Thread(con);
  12.         Thread t3 = new Thread(con);
  13.         //启动线程
  14.         t0.start();
  15.         t1.start();
  16.         t2.start();
  17.         t3.start();
  18.     }
  19. class Mutil_Producer implements Runnable
  20. {
  21.     private KaoYaResource r;
  22.     Mutil_Producer(KaoYaResource r)
  23.     {
  24.         this.r = r;
  25.     }
  26.     public void run()
  27.     {
  28.         while(true)
  29.         {
  30.             r.product(“北京烤鸭”);
  31.         }
  32.     }
  33. }
  34. class Mutil_Consumer implements Runnable
  35. {
  36.     private KaoYaResource r;
  37.     Mutil_Consumer(KaoYaResource r)
  38.     {
  39.         this.r = r;
  40.     }
  41.     public void run()
  42.     {
  43.         while(true)
  44.         {
  45.             r.consume();
  46.         }
  47.     }
  48. }

就多了两条线程,我们运行代码看看,结果如下:


不对呀,我们才生产一只烤鸭,怎么就被消费了3次啊,有的烤鸭生产了也没有被消费啊?难道共享数据源没有进行线程同步?我们再看看之前的KaoYaResource.java
  1. public class KaoYaResource {
  2.     private String name;
  3.     private int count = 1;//烤鸭的初始数量
  4.     private boolean flag = false;//判断是否有需要线程等待的标志
  5.     /**
  6.      * 生产烤鸭
  7.      */
  8.     public synchronized void product(String name){
  9.         if(flag){
  10.             //此时有烤鸭,等待
  11.             try {
  12.                 this.wait();
  13.             } catch (InterruptedException e) {
  14.                 e.printStackTrace();
  15.             }
  16.         }
  17.         this.name=name+count;//设置烤鸭的名称
  18.         count++;
  19.         System.out.println(Thread.currentThread().getName()+“…生产者…”+this.name);
  20.         flag=true;//有烤鸭后改变标志
  21.         notifyAll();//通知消费线程可以消费了
  22.     }
  23.     /**
  24.      * 消费烤鸭
  25.      */
  26.     public synchronized void consume(){
  27.         if(!flag){//如果没有烤鸭就等待
  28.             try{this.wait();}catch(InterruptedException e){}
  29.         }
  30.         System.out.println(Thread.currentThread().getName()+“…消费者……..”+this.name);//消费烤鸭1
  31.         flag = false;
  32.         notifyAll();//通知生产者生产烤鸭
  33.     }
  34. }
共享数据count的获取方法都进行synchronized关键字同步了呀!那怎么还会出现数据混乱的现象啊?
分析:确实,我们对共享数据也采用了同步措施,而且也应用了等待/通知机制,但是这样的措施只在单生产者单消费者的情况下才能正确应用,但从运行结果来看,我们之前的单生产者单消费者安全处理措施就不太适合多生产者多消费者的情况了。那么问题出在哪里?可以明确的告诉大家,肯定是在资源共享类,下面我们就来分析问题是如何出现,又该如何解决?直接上图


解决后的资源代码如下只将if改为了while:

  1. public class KaoYaResource {
  2.     private String name;
  3.     private int count = 1;//烤鸭的初始数量
  4.     private boolean flag = false;//判断是否有需要线程等待的标志
  5.     /**
  6.      * 生产烤鸭
  7.      */
  8.     public synchronized void product(String name){
  9.         while(flag){
  10.             //此时有烤鸭,等待
  11.             try {
  12.                 this.wait();
  13.             } catch (InterruptedException e) {
  14.                 e.printStackTrace();
  15.             }
  16.         }
  17.         this.name=name+count;//设置烤鸭的名称
  18.         count++;
  19.         System.out.println(Thread.currentThread().getName()+“…生产者…”+this.name);
  20.         flag=true;//有烤鸭后改变标志
  21.         notifyAll();//通知消费线程可以消费了
  22.     }
  23.     /**
  24.      * 消费烤鸭
  25.      */
  26.     public synchronized void consume(){
  27.         while(!flag){//如果没有烤鸭就等待
  28.             try{this.wait();}catch(InterruptedException e){}
  29.         }
  30.         System.out.println(Thread.currentThread().getName()+“…消费者……..”+this.name);//消费烤鸭1
  31.         flag = false;
  32.         notifyAll();//通知生产者生产烤鸭
  33.     }
  34. }

运行代码,结果如下:


到此,多消费者多生产者模式也完成,不过上面用的是synchronied关键字实现的,而锁对象的解决方法也一样将之前单消费者单生产者的资源类中的if判断改为while判断即可代码就不贴了哈。不过下面我们将介绍一种更有效的锁对象解决方法,我们准备使用两组条件对象(Condition也称为监视器)来实现等待/通知机制,也就是说通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。有了前面的分析这里我们直接上代码:
  1. public class ResourceBy2Condition {
  2.     private String name;
  3.     private int count = 1;
  4.     private boolean flag = false;
  5.     //创建一个锁对象。
  6.     Lock lock = new ReentrantLock();
  7.     //通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。
  8.     Condition producer_con = lock.newCondition();
  9.     Condition consumer_con = lock.newCondition();
  10.     /**
  11.      * 生产
  12.      * @param name
  13.      */
  14.     public  void product(String name)
  15.     {
  16.         lock.lock();
  17.         try
  18.         {
  19.             while(flag){
  20.                 try{producer_con.await();}catch(InterruptedException e){}
  21.             }
  22.             this.name = name + count;
  23.             count++;
  24.             System.out.println(Thread.currentThread().getName()+“…生产者5.0…”+this.name);
  25.             flag = true;
  26. //          notifyAll();
  27. //          con.signalAll();
  28.             consumer_con.signal();//直接唤醒消费线程
  29.         }
  30.         finally
  31.         {
  32.             lock.unlock();
  33.         }
  34.     }
  35.     /**
  36.      * 消费
  37.      */
  38.     public  void consume()
  39.     {
  40.         lock.lock();
  41.         try
  42.         {
  43.             while(!flag){
  44.                 try{consumer_con.await();}catch(InterruptedException e){}
  45.             }
  46.             System.out.println(Thread.currentThread().getName()+“…消费者.5.0…….”+this.name);//消费烤鸭1
  47.             flag = false;
  48. //          notifyAll();
  49. //          con.signalAll();
  50.             producer_con.signal();//直接唤醒生产线程
  51.         }
  52.         finally
  53.         {
  54.             lock.unlock();
  55.         }
  56.     }
  57. }
从代码中可以看到,我们创建了producer_con 和consumer_con两个条件对象,分别用于监听生产者线程和消费者线程,在product()方法中,我们获取到锁后,
如果此时flag为true的话,也就是此时还有烤鸭未被消费,因此生产线程需要等待,所以我们调用生产线程的监控器producer_con的
await()的方法进入阻塞等待池;但如果此时的flag为false的话,就说明烤鸭已经消费完,需要生产线程去生产烤鸭,那么生产线程将进行烤
鸭生产并通过消费线程的监控器consumer_con的signal()方法去通知消费线程对烤鸭进行消费。consume()方法也是同样的道理,这里就不
过多分析了。我们可以发现这种方法比我们之前的synchronized同步方法或者是单监视器的锁对象都来得高效和方便些,之前都是使用
notifyAll()和signalAll()方法去唤醒池中的线程,然后让池中的线程又进入 竞争队列去抢占CPU资源,这样不仅唤醒了无关的线程而且又让全
部线程进入了竞争队列中,而我们最后使用两种监听器分别监听生产者线程和消费者线程,这样的方式恰好解决前面两种方式的问题所在,
我们每次唤醒都只是生产者线程或者是消费者线程而不会让两者同时唤醒,这样不就能更高效得去执行程序了吗?好了,到此多生产者多消
费者模式也分析完毕。
5.线程死锁
现在我们再来讨论一下线程死锁问题,从上面的分析,我们知道锁是个非常有用的工具,运用的场景非常多,因为它使用起来非常简单,而
且易于理解。但同时它也会带来一些不必要的麻烦,那就是可能会引起死锁,一旦产生死锁,就会造成系统功能不可用。我们先通过一个例
子来分析,这个例子会引起死锁,使得线程t1和线程t2互相等待对方释放锁。
  1. public class DeadLockDemo {
  2.     private static String A=“A”;
  3.     private static String B=“B”;
  4.     public static void main(String[] args) {
  5.         DeadLockDemo deadLock=new DeadLockDemo();
  6.         while(true){
  7.             deadLock.deadLock();
  8.         }
  9.     }
  10.     private void deadLock(){
  11.         Thread t1=new Thread(new Runnable(){
  12.             @SuppressWarnings(“static-access”)
  13.             @Override
  14.             public void run() {
  15.                 synchronized (A) {
  16.                     try {
  17.                         Thread.currentThread().sleep(2000);
  18.                     } catch (InterruptedException e) {
  19.                         e.printStackTrace();
  20.                     }
  21.                 }
  22.                 synchronized(B){
  23.                     System.out.println(“1”);
  24.                 }
  25.             }
  26.         });
  27.         Thread t2 =new Thread(new Runnable() {
  28.             @Override
  29.             public void run() {
  30.                 synchronized (B) {
  31.                     synchronized (A) {
  32.                         System.out.println(“2”);
  33.                     }
  34.                 }
  35.             }
  36.         });
  37.         //启动线程
  38.         t1.start();
  39.         t2.start();
  40.     }
  41. }
同步嵌套是产生死锁的常见情景,从上面的代码中我们可以看出,当t1线程拿到锁A后,睡眠2秒,此时线程t2刚好拿到了B锁,接着要获取A锁,但是此时A锁正好被t1线程持有,因此只能等待t1线程释放锁A,但遗憾的是在t1线程内又要求获取到B锁,而B锁此时又被t2线程持有,到此结果就是t1线程拿到了锁A同时在等待t2线程释放锁B,而t2线程获取到了锁B也同时在等待t1线程释放锁A,彼此等待也就造成了线程死锁问题。虽然我们现实中一般不会向上面那么写出那样的代码,但是有些更为复杂的场景中,我们可能会遇到这样的问题,比如t1拿了锁之后,因为一些异常情况没有释放锁(死循环),也可能t1拿到一个数据库锁,释放锁的时候抛出了异常,没有释放等等,所以我们应该在写代码的时候多考虑死锁的情况,这样才能有效预防死锁程序的出现。下面我们介绍一下避免死锁的几个常见方法:
1.避免一个线程同时获取多个锁。
2.避免在一个资源内占用多个 资源,尽量保证每个锁只占用一个资源。
3.尝试使用定时锁,使用tryLock(timeout)来代替使用内部锁机制。
4.对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况。
5.避免同步嵌套的发生
6.Thread.join()
如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才能从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。这两个超时的方法表示,如果线程在给定的超时时间里没有终止,那么将会从该超时方法中返回。下面给出一个例子,创建10个线程,编号0~9,每个线程调用钱一个线程的join()方法,也就是线程0结束了,线程1才能从join()方法中返回,而0需要等待main线程结束。
  1. package com.zejian.test;
  2. /**
  3.  * @author zejian
  4.  * @time 2016年3月13日 下午4:10:03
  5.  * @decrition join案例
  6.  */
  7. public class JoinDemo {
  8.     public static void main(String[] args) {
  9.         Thread previous = Thread.currentThread();
  10.         for(int i=0;i<10;i++){
  11.             //每个线程拥有前一个线程的引用。需要等待前一个线程终止,才能从等待中返回
  12.             Thread thread=new Thread(new Domino(previous),String.valueOf(i));
  13.             thread.start();
  14.             previous=thread;
  15.         }
  16.         System.out.println(Thread.currentThread().getName()+” 线程结束”);
  17.     }
  18. }
  19. class Domino implements Runnable{
  20.     private Thread thread;
  21.     public Domino(Thread thread){
  22.         this.thread=thread;
  23.     }
  24.     @Override
  25.     public void run() {
  26.         try {
  27.             thread.join();
  28.         } catch (InterruptedException e) {
  29.             e.printStackTrace();
  30.         }
  31.         System.out.println(Thread.currentThread().getName()+” 线程结束”);
  32.     }
  33. }

好了,到此本篇结束。

java多线程-概念&创建启动&中断&守护线程&优先级&线程状态(多线程编程之一)

今天开始就来总结一下java多线程的基础知识点,下面是本篇的主要内容(大部分知识点参考java核心技术卷1):

1.什么是线程以及多线程与进程的区别
2.多线程的创建与启动
3.中断线程和守护线程以及线程优先级
4.线程的状态转化关系
1.什么是线程以及多线程与进程的区别
在现代操作在运行一个程序时,会为其创建一个进程。例如启动一个QQ程序,操作系统就会为其创建一个进程。而操作系统中调度的最小单位元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器,堆栈和局部变量等属性,并且能够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。因此我们可以这样理解:
进程:正在运行的程序,是系统进行资源分配和调用的独立单位。每一个进程都有它自己的内存空间和系统资源。
线程:是进程中的单个顺序控制流,是一条执行路径一个进程如果只有一条执行路径,则称为单线程程序。一个进程如果有多条执行路径,则称为多线程程序。
2.多线程的创建与启动
创建多线程有两种方法,一种是继承Thread类重写run方法,另一种是实现Runnable接口重写run方法。下面我们分别给出代码示例,继承Thread类重写run方法:
  1. public class ThreadByEx extends Thread{
  2.     /**
  3.      * 重写run方法
  4.      */
  5.     @Override
  6.     public void run() {
  7.         System.out.println(“I’m a thread that extends Thread!”);
  8.     }
  9. }

实现Runnable接口重写run方法:

  1. public class ThreadByRunnable implements Runnable{
  2.     /**
  3.      * 实现run方法
  4.      */
  5.     @Override
  6.     public void run() {
  7.         System.out.println(“I’m a thread that implements Runnable !”);
  8.     }
  9. }

怎么启动线程?

  1. public class MainTest {
  2.     public static void main(String[] args) {
  3.         //继承Thread启动的方法
  4.         ThreadByEx t1=new ThreadByEx();
  5.         t1.start();//启动线程
  6.         //实现Runnable启动线程的方法
  7.         ThreadByRunnable r = new ThreadByRunnable();
  8.         Thread t2 =new Thread(r);
  9.         t2.start();//启动线程
  10.     }
  11. }

运行结果:

  1. I’m a thread that extends Thread!
  2. I’m a thread that implements Runnable !
代码相当简单,不过多解释。这里有点需要注意的是调用start()方法后并不是是立即的执行多线程的代码,而是使该线程变为可运行态,什么时候运行多线程代码是由操作系统决定的。
3.中断线程和守护线程以及线程优先级
什么是中断线程?
我们先来看看中断线程是什么?(该解释来自java核心技术一书,我对其进行稍微简化),当线程的run()方法执行方法体中的最后一条语句后,并经由执行return语句返回时,或者出现在方法中没有捕获的异常时线程将终止。在java早期版本中有一个stop方法,其他线程可以调用它终止线程,但是这个方法现在已经被弃用了,因为这个方法会造成一些线程不安全的问题。我们可以把中断理解为一个标识位的属性,它表示一个运行中的线程是否被其他线程进行了中断操作,而中断就好比其他线程对该线程打可个招呼,其他线程通过调用该线程的interrupt方法对其进行中断操作,当一个线程调用interrupt方法时,线程的中断状态(标识位)将被置位(改变),这是每个线程都具有的boolean标志,每个线程都应该不时的检查这个标志,来判断线程是否被中断。而要判断线程是否被中断,我们可以使用如下代码
  1. Thread.currentThread().isInterrupted()
  1. while(!Thread.currentThread().isInterrupted()){
  2.     do something
  3. }

但是如果此时线程处于阻塞状态(sleep或者wait),就无法检查中断状态,此时会抛出InterruptedException异常。如果每次迭代之后都调用sleep方法(或者其他可中断的方法),isInterrupted检测就没必要也没用处了,如果在中断状态被置位时调用sleep方法,它不会休眠反而会清除这一休眠状态并抛出InterruptedException。所以如果在循环中调用sleep,不要去检测中断状态,只需捕获InterruptedException。代码范例如下:

  1. public void run(){
  2.         while(more work to do ){
  3.             try {
  4.                 Thread.sleep(5000);
  5.             } catch (InterruptedException e) {
  6.                 //thread was interrupted during sleep
  7.                 e.printStackTrace();
  8.             }finally{
  9.                 //clean up , if required
  10.             }
  11.         }
同时还有点要注意的就是我们在捉中断异常时尽量按如下形式处理,不要留空白什么都不处理!
不妥的处理方式:
  1. void myTask(){
  2.     …
  3.    try{
  4.        sleep(50)
  5.       }catch(InterruptedException e){
  6.    …
  7.    }
  8. }
  1. void myTask()throw InterruptedException{
  2.     sleep(50)
  3. }

或者

  1. void myTask(){
  2.     …
  3.     try{
  4.     sleep(50)
  5.     }catch(InterruptedException e){
  6.      Thread.currentThread().interrupt();
  7.     }
  8. }
最后关于中断线程,我们这里给出中断线程的一些主要方法:
void interrupt():向线程发送中断请求,线程的中断状态将会被设置为true,如果当前线程被一个sleep调用阻塞,那么将会抛出interrupedException异常。
static boolean interrupted():测试当前线程(当前正在执行命令的这个线程)是否被中断。注意这是个静态方法,调用这个方法会产生一个副作用那就是它会将当前线程的中断状态重置为false。
boolean isInterrupted():判断线程是否被中断,这个方法的调用不会产生副作用即不改变线程的当前中断状态。
static Thread currentThread() : 返回代表当前执行线程的Thread对象。
什么是守护线程?
首先我们可以通过t.setDaemon(true)的方法将线程转化为守护线程。而守护线程的唯一作用就是为其他线程提供服务。计时线程就是一个典型的例子,它定时地发送“计时器滴答”信号告诉其他线程去执行某项任务。当只剩下守护线程时,虚拟机就退出了,因为如果只剩下守护线程,程序就没有必要执行了。另外JVM的垃圾回收、内存管理等线程都是守护线程。还有就是在做数据库应用时候,使用的数据库连接池,连接池本身也包含着很多后台线程,监控连接个数、超时时间、状态等等。最后还有一点需要特别注意的是在java虚拟机退出时Daemon线程中的finally代码块并不一定会执行哦,代码示例:
  1. public class Demon {
  2.     public static void main(String[] args) {
  3.         Thread deamon = new Thread(new DaemonRunner(),“DaemonRunner”);
  4.         //设置为守护线程
  5.         deamon.setDaemon(true);
  6.         deamon.start();//启动线程
  7.     }
  8.     static class DaemonRunner implements Runnable{
  9.         @Override
  10.         public void run() {
  11.             try {
  12.                 Thread.sleep(500);
  13.             } catch (InterruptedException e) {
  14.                 e.printStackTrace();
  15.             }finally{
  16.                 System.out.println(“这里的代码在java虚拟机退出时并不一定会执行哦!”);
  17.             }
  18.         }
  19.     }
  20. }
因此在构建Daemon线程时,不能依靠finally代码块中的内容来确保执行关闭或清理资源的逻辑。
什么是线程优先级
在现代操作系统中基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下一次分配。线程分配到的时间片多少也决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器资源的线程属性。在java线程中,通过一个整型的成员变量Priority来控制线程优先级,每一个线程有一个优先级,默认情况下,一个线程继承它父类的优先级。可以用setPriority方法提高或降低任何一个线程优先级。可以将优先级设置在MIN_PRIORITY(在Thread类定义为1)与MAX_PRIORITY(在Thread类定义为10)之间的任何值。线程的默认优先级为NORM_PRIORITY(在Thread类定义为5)。尽量不要依赖优先级,如果确实要用,应该避免初学者常犯的一个错误。如果有几个高优先级的线程没有进入非活动状态,低优先级线程可能永远也不能执行。每当调度器决定运行一个新线程时,首先会在具有高优先级的线程中进行选择,尽管这样会使低优先级的线程可能永远不会被执行到。因此我们在设置优先级时,针对频繁阻塞(休眠或者I/O操作)的线程需要设置较高的优先级,而偏重计算(需要较多CPU时间或者运算)的线程则设置较低的优先级,这样才能确保处理器不会被长久独占。当然还有要注意就是在不同的JVM以及操作系统上线程的规划存在差异,有些操作系统甚至会忽略对线程优先级的设定,如mac os系统或者Ubuntu系统……..
4.线程的状态转化关系
(1). 新建状态(New):新创建了一个线程对象。
(2). 就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于可运行线程池中,变得可运行,等待获取CPU的使用权。
(3). 运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。
(4). 阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:

– 等待阻塞(WAITING):运行的线程执行wait()方法,JVM会把该线程放入等待池中。

– 同步阻塞(Blocked):运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。

– 超时阻塞(TIME_WAITING):运行的线程执行sleep(long)或join(long)方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。

(5). 死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。

图中的方法解析如下:

Thread.sleep():在指定时间内让当前正在执行的线程暂停执行,但不会释放”锁标志”。不推荐使用。
Thread.sleep(long):使当前线程进入阻塞状态,在指定时间内不会执行。
Object.wait()和Object.wait(long):在其他线程调用对象的notify或notifyAll方法前,导致当前线程等待。线程会释放掉它所占有的”锁标志”,从而使别的线程有机会抢占该锁。 当前线程必须拥有当前对象锁。如果当前线程不是此锁的拥有者,会抛出IllegalMonitorStateException异常。 唤醒当前对象锁的等待线程使用notify或notifyAll方法,也必须拥有相同的对象锁,否则也会抛出IllegalMonitorStateException异常,waite()和notify()必须在synchronized函数或synchronized中进行调用。如果在non-synchronized函数或non-synchronized中进行调用,虽然能编译通过,但在运行时会发生IllegalMonitorStateException的异常。
Object.notifyAll():则从对象等待池中唤醒所有等待等待线程
Object.notify():则从对象等待池中唤醒其中一个线程。
Thread.yield()方法 暂停当前正在执行的线程对象,yield()只是使当前线程重新回到可执行状态,所以执行yield()的线程有可能在进入到可执行状态后马上又被执行,yield()只能使同优先级或更高优先级的线程有执行的机会。
Thread.Join():把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B。
好了。本篇线程基础知识介绍到此结束。

nginx流媒体rtmp直播

 

1.先下载安装  nginx 和 nginx-rtmp 编译依赖工具

sudo apt-get install build-essential libpcre3 libpcre3-dev libssl-dev

2. 创建一个nginx目录,并切换到nginx目录

mkdir ~/nginx
cd ~/nginx

3. 下载 nginx 和 nginx-rtmp源码

wget http://nginx.org/download/nginx-1.9.9.tar.gz
wget https://github.com/arut/nginx-rtmp-module/archive/master.zip

4. 安装unzip工具,解压下载的安装包

sudo apt-get install unzip

5.解压 nginx 和 nginx-rtmp安装包

tar -zxvf nginx-1.9.9.tar.gz
unzip master.zip

6. 切换到 nginx-目录

cd nginx-1.9.9

7.添加 nginx-rtmp 模板编译到 nginx

./configure --with-http_ssl_module --add-module=../nginx-rtmp-module-master

8.编译安装

make
sudo make install

9. 安装nginx init 脚本

sudo wget https://raw.github.com/JasonGiedymin/nginx-init-ubuntu/master/nginx -O /etc/init.d/nginx
sudo chmod +x /etc/init.d/nginx
sudo update-rc.d nginx defaults

10. 启动和停止nginx 服务,生成配置文件

sudo service nginx start
sudo service nginx stop

11. 安装 FFmpeg

sudo add-apt-repository ppa:kirillshkrogalev/ffmpeg-next
sudo apt-get update
sudo apt-get install ffmpeg

12. 配置 nginx-rtmp 服务器

打开 /usr/local/nginx/conf/nginx.conf

添加location

location /hls {
            types {
                application/vnd.apple.mpegurl m3u8;
                video/mp2t ts;
            }
            root ~/nginx/www;
            expires -1;
            add_header Cache-Control no-cache;
        }

文件末尾添加rtmp配置

rtmp {
      server {
              listen 1935; 
              publish_time_fix on;
              application myapp {
                      live on; #stream on live allow
                      allow publish all; # control access privilege
                      allow play all; # control access privilege
              }
              application hls {
                      live on;
                      hls on;  #这个参数把直播服务器改造成实时回放服务器。
                      hls_path ~/nginx/www/hls;        #切片视频文件存放位置。
                      wait_key on; #对视频切片进行保护,这样就不会产生马赛克了。 
                      hls_fragment 10s; #每个视频切片的时长。 
                      hls_playlist_length 60s; #总共可以回看的事件,这里设置的是1分钟。 
                      hls_continuous on; #连续模式。 
                      hls_cleanup on; #对多余的切片进行删除。 
                      hls_nested on; #嵌套模式。
              }
      }
}

13. 保存上面配置文件,然后重新启动nginx服务

sudo service nginx restart

14. ffmpeg将rtsp转码为rtmp

(后面的rtmp在其他地方的访问地址为:rtmp://ip:1935/myapp/stream-name,可用VLC media player打开)

ffmpeg -i "rtsp://xxxx" -f flv -r 15 -s 1280x960 -an "rtmp://localhost:1935/myapp/stream-name"

15. ffmpeg将rtsp转码为m3u8:

(m3u8的访问地址为:http://ip:port/hls/stream-name.m3u8,port为nginx的访问端口号)

ffmpeg -i "rtsp://admin:auto12345@114.242.47.195:2012" -strict -2 -c:v libx264 -c:a aac -f hls /var/www/hls/stream-name.m3u8

16. html中使用video.js访问流媒体服务器:

国人处理好的videojs包为:http://pan.baidu.com/s/1kVuU3PX,此包已经支持IE8的视频播放。

<!DOCTYPE html>
<html>
<head>
  <title>Video.js | HTML5 Video Player</title>

  <!-- Chang URLs to wherever Video.js files will be hosted -->
  <link href="video-js.css" rel="stylesheet" type="text/css">
  <!-- video.js must be in the <head> for older IEs to work. -->
  <script src="video.js"></script>

  <!-- Unless using the CDN hosted version, update the URL to the Flash SWF -->
  <script>
    videojs.options.flash.swf = "video-js.swf";
  </script>
</head>
<body>
  <video id="example_video_1" class="video-js vjs-default-skin" controls preload="none" width="640" height="264"
      poster="oceans-clip.png"
      data-setup="{}">
   <source src="rtmp://e.5iwf.cn:1935/myapp/video5" type="rtmp/flv">
    <!-- 如果上面的rtmp流无法播放,就播放hls流 -->
    <source src="http://e.5iwf.cn:9999/hls/video5.m3u8" type='application/x-mpegURL'>
  </video>
</body>
</html>

C#语法糖($)(?.)(??)

实际上是C# 6.0对string.Format的改进

1.将字符串文本标识为内插字符串($)

根据微软的例子来看:

using System;

public class Example
{
   public static void Main()
   {
      string name = "John";
      string greeting = $"Hello, {name}!";
      Console.WriteLine(greeting);
   }
}

相当于原先的string.Format这种必须使用占位符,极容易出错:

string name = "John";
string str = string.Fromat("Hello,{0}!",name);

而使用$则不容易出现错误,可以这样写:

string name = "John";
string str = $"Hello,{name}!";

2.新增语法糖:(?.)
这也是C#6.0的语法,这叫Null-Conditional Operator(null条件运算符)
我们经常需要判断对象是否为null(不判断呢就会报异常System.NullReferenceException之类的),我们一般像这样:

List<string> list = null;
if (list != null)
{
    string[] strArr = list.ToArray();
}

或者这样

List<string> list = null;
string[] strArr = (list != null ? list.ToArray() : null);

使用这个null条件运算符就可以这样简单的完成这件事了

List<string> list = null;
string[] strArr = list?.ToArray();

解释一下,当 list 为 null 时就不进行后面的ToArray(),返回null,当 list 不为 null 就ToArray();
这里要注意一下,前面strArr这个得是个可null的值


3.多个(??)
?? 运算符称作 null 合并运算符。 如果此运算符的左操作数不为 null,则此运算符将返回左操作数;否则返回右操作数。
演示一下:

int? num = null;//前面设定了个可空的int,后面想null时变为0就好
int num1 = num ?? 0;