Drools规则引擎初探

1.drools是什么

Drools是为Java量身定制的基于Charles  Forgy的RETE算法的规则引擎的实现。具有了OO接口的RETE,使得商业规则有了更自然的表达。

Rule是什么呢?

一条规则是对商业知识的编码。一条规则有 attributes ,一个 Left Hand Side ( LHS )和一个 Right Hand Side ( RHS )。Drools 允许下列几种 attributes : salience , agenda-group , no-loop , auto-focus , duration , activation-group  。

规则的 LHS 由一个或多个条件( Conditions )组成。当所有的条件( Conditions )都满足并为真时, RHS 将被执行。 RHS 被称为结果( Consequence )。 LHS 和 RHS 类似于

if(<LHS>){

<RHS>

}

下面介绍几个术语:

对新的数据和被修改的数据进行规则的匹配称为模式匹配( Pattern Matching )。进行匹配的引擎称为推理机( Inference Engine )。被访问的规则称为 ProductionMemory ,被推理机进行匹配的数据称为 WorkingMemory 。 Agenda 管理被匹配规则的执行。推理机所采用的模式匹配算法有下列几种: Linear , RETE , Treat , Leaps 。这里注意加红的地方,对数据的修改也会触发重新匹配,即对 WorkingMemory中的数据进行了修改。

然后规则引擎大概是这个样子的:

这个图也很好理解,就是推理机拿到数据和规则后,进行匹配,然后把匹配的规则和数据传递给Agenda。

规则引擎实现了数据同逻辑的完全解耦。规则并不能被直接调用,因为它们不是方法或函数,规则的激发是对 WorkingMemory 中数据变化的响应。结果( Consequence ,即 RHS )作为 LHS events 完全匹配的 Listener 。

数据被 assert 进 WorkingMemory 后,和 RuleBase 中的 rule 进行匹配(确切的说应该是 rule 的 LHS ),如果匹配成功这条 rule 连同和它匹配的数据(此时就叫做 Activation )一起被放入 Agenda ,等待 Agenda 来负责安排激发 Activation (其实就是执行 rule 的 RHS ),上图中的菱形部分就是在 Agenda 中来执行的, Agenda 就会根据冲突解决策略来安排 Activation 的执行顺序。

下面附上drools规则引擎的执行过程:

2.rete算法

参考链接:Rete Algorithm

rete在拉丁文里是net network的意思,这个算法由 Charles Forgy  博士在他的博士论文里提到。

这个算法可以分为两个部分,一个是如何编译规则,一个是如何执行。原话(The Rete algorithm can be broken into 2 parts: rule compilation and runtime execution.)

rule compilation 就是如何通过对所有规则进行处理,生成一个有效的辨别网络。而一个辨别网络,则对数据进行过滤,使数据一步步往下传送。数据刚进入网络,有很多的匹配条件,这里可以理解为:逻辑表达式为true or false,然后在网络里往下传递的时候,匹配的条件越来越少,最后到达一个终止节点。

在这个论文里Dr Charles描述了这么几个节点,Node:

2.rete算法

参考链接:Rete Algorithm

rete在拉丁文里是net network的意思,这个算法由 Charles Forgy  博士在他的博士论文里提到。

这个算法可以分为两个部分,一个是如何编译规则,一个是如何执行。原话(The Rete algorithm can be broken into 2 parts: rule compilation and runtime execution.)

rule compilation 就是如何通过对所有规则进行处理,生成一个有效的辨别网络。而一个辨别网络,则对数据进行过滤,使数据一步步往下传送。数据刚进入网络,有很多的匹配条件,这里可以理解为:逻辑表达式为true or false,然后在网络里往下传递的时候,匹配的条件越来越少,最后到达一个终止节点。

在这个论文里Dr Charles描述了这么几个节点,Node:

这里对其中的几个节点做一下简单介绍,另外说一下如何运作的。

  • 首先,root node是所有的对象都可以进入的节点,也是辨别网络的一个入口,这个可以理解为一个虚节点,其实可能并不存在。
  • 然后立马进入到ObjectTypeNode节点,这是一个对象类型节点。很明显,这里承载的是一个对象,可以理解为是java中的某个new Object(),在这个算法里,这个节点的作用就是为了保证不做一些无用功,什么无用功呢,就是不是对每个规则,进入的对象都要去辨别一遍,而是确定的对象类型,去做跟他相关的辨别,其实就是match。那么怎么做到呢?这里用到了一个hashMap,每次进入网络的对象,都会在这个map中通过hash,找到一个对应的辨别路径去辨别,即match。附上英文原文:(Drools extends Rete by optimizing the propagation from ObjectTypeNode to AlphaNode using hashing. Each time an AlphaNode is added to an ObjectTypeNode it adds the literal value as a key to the HashMap with the AlphaNode as the value. When a new instance enters the ObjectType node, rather than propagating to each AlphaNode, it can instead retrieve the correct AlphaNode from the HashMap,thereby avoiding unnecessary literal checks.)
    一个图来说明:

所有经过ObjectTypeNode的对象都会走到下一个节点,下一个节点可以是下面的几种:AlphaNodes, LeftInputAdapterNodes and BetaNodes。后面两个节点是AlphaNodes节点的一些变种,AlphaNodes节点是用来判断一些条件的。可以理解为一些逻辑表达式的计算。

下面开始上图:

  • 这个图就是传递进一个Cheese对象,然后依次判断是否满足条件:1.判断name是否是“cheddar”,2.如果判断1通过了,继续判断strength是否是strong。这是最简单了一种情况了,这里附上对应的规则描述,后面会继续讲解:
    rule “cheessRule” when
    $cheese:Cheese(name == “cheddar” && strength == “strong”)
    then
    ……
    end

3.maven依赖

这里列了一些后面的一些例子需要用到的maven依赖

<!--kie api 构建kie虚拟文件系统,关联decisiontable和drl文件,很关键 -->
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-api</artifactId>
</dependency>
<!-- 规则引擎核心包,里面包含了RETE引擎和LEAPS 引擎-->
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
</dependency>
<!-- 决策表依赖-->
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-decisiontables</artifactId>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-templates</artifactId>
</dependency>

4.规则文件:.drl or xls

我们一般用到的也就这两种形式,一个是drl文件,是drools规则引擎提供的最原生的方式,语法很简单,具体语法见

还有一个是决策表,决策表可以是xls也可以是csv,我们一般用xls比较多。而且好理解。xls就是一个excel文件。ps:在使用的过程中,遇到很多坑,其中一个最大的坑是mac系统的问题,这里后面会安利。

drl文件

首先来看下drl文件,这个在第2条讲解node的时候已经提到过了。
举例:



rule "ageUp12" when
 $student: Student(age > 2)
then
 $student.ageUp12();
end

rule "nameMax" when
 $student: Student(name == "max")
then
 $student.nameMax();
retract($student);
end
简单说明:以第一个rule为例
  • package 定义了规则文件的一个命名空间,和java中的package无关。
  • import 这里可以有多个,就是在规则文件里引用到的java类。
  • rule 用来定义一个规则,这里名字不可重复,后面跟一个when关键字,翻译过来就是,规则 名ageUp12,当满足……
  • when 和then之间是逻辑表达式,也就是辨别条件,其中$student:Student(age >2)这里其实包含了两个意思,一个是满足age>2的Student对象,一个是把这个对象赋值给$student变量,这样后面就可以引用这个变量了。逻辑表达式写在小括号里,如果是多个条件,可以用逗号分隔,如$sutdent :Student(age > 2,name==”max”)
  • then和end之间来定义action,即当满足age>2的时候,做什么操作,这里可以像在java方法里一样,调用任何一个java类的方法,只要import了这个类且在前面定义了这个变量

第二个例子可以看到有个retract($student),这里是用到了drools内部提供的一个函数,具体见后续关于drools语法介绍的博客


决策表(decisiontable)

决策表就是一个excel文件,可以是xls(xlsx暂不支持)或者csv是个表格,看上去也很直观,即便是不懂代码的人看了也能看懂,不像drl文件那么多语法。关键的一点是:decisiontable也是最终转成drl文件来让drools规则引擎来解析执行的。*.xls到*.drl的转换这个在后面的wiki会说到。

直接上图吧

这里可以暂时忽略那些背景色,只是为了好区分没个模块的作用

这里忽略文件开始的空行,从有数据的第一行开始解释说明:

第一行,第一列:RuleSet 第二列。这里RuleSet可以省略的,累似drl文件中的package

第二行,第一列:Import 第二列具体的java类,这里和drl文件里的Improt相对应,多个引用类用逗号分隔

第三行,是个对这个决策表的说明

第四行,第一列:RuleTable FirstDecisionTable 这一行很关键 指明这是一个决策表,并且下面的几行都是具体的规则,就好比上面几行是一些准备条件,下面才是真正干活的地方,这里来个说明

第五行,CONDITION行,这一行可以有两种列名:CONDITION ACTION。CONDITION列就是drl里的辨别条件,  ACTION则是具体的操作,即满足前面几列的CONDITION的条件后,会执行什么操作,这里CONDITION一定在ACTION前面,ACTION可以有多个列, 单个ACTION里的多个操作用逗号分隔,末尾要加分号结尾这里很重要,不然会有解析错误

第六行,紧挨着CONDITION的一行,可以在这里声明下面要用的到对象,对应drl文件里的$student:Student()

第七行,是辨别条件逻辑表达式,如:student.getAge()==$param则对应drl里的age==12这里$param是对应列每个单元格的值,然后这里需要特别说明下,针对于非字符串,如整数,小数等,可以直接使用$param,但是如果单元格里是字符串,则需要加双引号。(ps:mac里的双引号是斜的,一定要保证是竖着”的)另外,如果有多个值,可以用逗号隔开,然后可以用$1,$2提取变量值,如第一个ACTION里的student.doAction1($1,”$2″)

第八行仍然是注释行,可以添加每一个CONDITON ACTION列的说明。

下面的每一行就是对应的某些条件的取值了。

参考:decisionTable

SpringBoot Redis分布式锁

随着现在分布式架构越来越盛行,在很多场景下需要使用到分布式锁。分布式锁的实现有很多种,比如基于数据库、 zookeeper 等,本文主要介绍使用 Redis 做分布式锁的方式,并封装成spring boot starter,方便使用

一. Redis 分布式锁的实现以及存在的问题

锁是针对某个资源,保证其访问的互斥性,在实际使用当中,这个资源一般是一个字符串。使用 Redis 实现锁,主要是将资源放到 Redis 当中,利用其原子性,当其他线程访问时,如果 Redis 中已经存在这个资源,就不允许之后的一些操作。spring boot使用 Redis 的操作主要是通过 RedisTemplate 来实现,一般步骤如下:

1.将锁资源放入 Redis (注意是当key不存在时才能放成功,所以使用 setIfAbsent 方法):

redisTemplate.opsForValue().setIfAbsent("key", "value");

2.设置过期时间

redisTemplate.expire("key", 30000, TimeUnit.MILLISECONDS);

3.释放锁

redisTemplate.delete("key");

一般情况下,这样的实现就能够满足锁的需求了,但是如果在调用 setIfAbsent 方法之后线程挂掉了,即没有给锁定的资源设置过期时间,默认是永不过期,那么这个锁就会一直存在。所以需要保证设置锁及其过期时间两个操作的原子性,spring data的 RedisTemplate 当中并没有这样的方法。

但是在jedis当中是有这种原子操作的方法的,需要通过 RedisTemplate 的 execute 方法获取到jedis里操作命令的对象,代码如下:

String result = redisTemplate.execute(new RedisCallback<String>() {
    @Override
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
        return commands.set(key, "锁定的资源", "NX", "PX", expire);
    }
});

注意: Redis 从2.6.12版本开始 set 命令支持 NX 、 PX 这些参数来达到 setnx 、 setex 、 psetex 命令的效果。
文档参见: http://doc.redisfans.com/string/set.html

NX: 表示只有当锁定资源不存在的时候才能 SET 成功。利用 Redis 的原子性,保证了只有第一个请求的线程才能获得锁,而之后的所有线程在锁定资源被释放之前都不能获得锁。

PX: expire 表示锁定的资源的自动过期时间,单位是毫秒。具体过期时间根据实际场景而定

这样在获取锁的时候就能够保证设置 Redis 值和过期时间的原子性,避免前面提到的两次 Redis 操作期间出现意外而导致的锁不能释放的问题。但是这样还是可能会存在一个问题,考虑如下的场景顺序:

  • 线程T1获取锁
  • 线程T1执行业务操作,由于某些原因阻塞了较长时间
  • 锁自动过期,即锁自动释放了
  • 线程T2获取锁
  • 线程T1业务操作完毕,释放锁(其实是释放的线程T2的锁)

按照这样的场景顺序,线程T2的业务操作实际上就没有锁提供保护机制了。所以,每个线程释放锁的时候只能释放自己的锁,即锁必须要有一个拥有者的标记,并且也需要保证释放锁的原子性操作。

因此在获取锁的时候,可以生成一个随机不唯一的串放入当前线程中,然后再放入 Redis 。释放锁的时候先判断锁对应的值是否与线程中的值相同,相同时才做删除操作。

Redis 从2.6.0开始通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值,文档参见: http://doc.redisfans.com/script/eval.html

因此我们可以通过 Lua 脚本来达到释放锁的原子操作,定义 Lua 脚本如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

具体意思可以参考上面提供的文档地址

使用 RedisTemplate 执行的代码如下:

// 使用Lua脚本删除Redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
Long result = redisTemplate.execute(new RedisCallback<Long>() {
    public Long doInRedis(RedisConnection connection) throws DataAccessException {
        Object nativeConnection = connection.getNativeConnection();
        // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
        // 集群模式
        if (nativeConnection instanceof JedisCluster) {
            return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
        }

        // 单机模式
        else if (nativeConnection instanceof Jedis) {
            return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
        }
        return 0L;
    }
});

代码中分为集群模式和单机模式,并且两者的方法、参数都一样,原因是spring封装的执行脚本的方法中( RedisConnection 接口继承于 RedisScriptingCommands 接口的 eval 方法),集群模式的方法直接抛出了不支持执行脚本的异常(虽然实际是支持的),所以只能拿到 Redis 的connection来执行脚本,而 JedisCluster 和 Jedis 中的方法又没有实现共同的接口,所以只能分开调用。

spring封装的集群模式执行脚本方法源码:

# JedisClusterConnection.java
/**
 * (non-Javadoc)
 * @see org.springframework.data.redis.connection.RedisScriptingCommands#eval(byte[], org.springframework.data.redis.connection.ReturnType, int, byte[][])
 */
@Override
public <T> T eval(byte[] script, ReturnType returnType, int numKeys, byte[]... keysAndArgs) {
    throw new InvalidDataAccessApiUsageException("Eval is not supported in cluster environment.");
}

至此,我们就完成了一个相对可靠的 Redis 分布式锁,但是,在集群模式的极端情况下,还是可能会存在一些问题,比如如下的场景顺序( 本文暂时不深入开展 ):

  • 线程T1获取锁成功
  • Redis 的master节点挂掉,slave自动顶上
  • 线程T2获取锁,会从slave节点上去判断锁是否存在,由于Redis的master slave复制是异步的,所以此时线程T2可能成功获取到锁

为了可以以后扩展为使用其他方式来实现分布式锁,定义了接口和抽象类,所有的源码如下:

# DistributedLock.java 顶级接口
/**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:05
 * @version 1.0.0
 */
public interface DistributedLock {

    public static final long TIMEOUT_MILLIS = 30000;

    public static final int RETRY_TIMES = Integer.MAX_VALUE;

    public static final long SLEEP_MILLIS = 500;

    public boolean lock(String key);

    public boolean lock(String key, int retryTimes);

    public boolean lock(String key, int retryTimes, long sleepMillis);

    public boolean lock(String key, long expire);

    public boolean lock(String key, long expire, int retryTimes);

    public boolean lock(String key, long expire, int retryTimes, long sleepMillis);

    public boolean releaseLock(String key);
}
# AbstractDistributedLock.java 抽象类,实现基本的方法,关键方法由子类去实现
/**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:10:57
 * @version 1.0.0
 */
public abstract class AbstractDistributedLock implements DistributedLock {

    @Override
    public boolean lock(String key) {
        return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, int retryTimes) {
        return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, int retryTimes, long sleepMillis) {
        return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
    }

    @Override
    public boolean lock(String key, long expire) {
        return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, long expire, int retryTimes) {
        return lock(key, expire, retryTimes, SLEEP_MILLIS);
    }

}
# RedisDistributedLock.java Redis分布式锁的实现
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

/**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:14
 * @version 1.0.0
 */
public class RedisDistributedLock extends AbstractDistributedLock {

    private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);

    private RedisTemplate<Object, Object> redisTemplate;

    private ThreadLocal<String> lockFlag = new ThreadLocal<String>();

    public static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call("del",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
        super();
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
        boolean result = setRedis(key, expire);
        // 如果获取锁失败,按照传入的重试次数进行重试
        while((!result) && retryTimes-- > 0){
            try {
                logger.debug("lock failed, retrying..." + retryTimes);
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                return false;
            }
            result = setRedis(key, expire);
        }
        return result;
    }

    private boolean setRedis(String key, long expire) {
        try {
            String result = redisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    String uuid = UUID.randomUUID().toString();
                    lockFlag.set(uuid);
                    return commands.set(key, uuid, "NX", "PX", expire);
                }
            });
            return !StringUtils.isEmpty(result);
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        }
        return false;
    }

    @Override
    public boolean releaseLock(String key) {
        // 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
        try {
            List<String> keys = new ArrayList<String>();
            keys.add(key);
            List<String> args = new ArrayList<String>();
            args.add(lockFlag.get());

            // 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
            // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本

            Long result = redisTemplate.execute(new RedisCallback<Long>() {
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }

                    // 单机模式
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    return 0L;
                }
            });

            return result != null && result > 0;
        } catch (Exception e) {
            logger.error("release lock occured an exception", e);
        }
        return false;
    }

}

二. 基于 AOP 的 Redis 分布式锁

在实际的使用过程中,分布式锁可以封装好后使用在方法级别,这样就不用每个地方都去获取锁和释放锁,使用起来更加方便。

首先定义个注解:

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:10:36
 * @version 1.0.0
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {

    /** 锁的资源,redis的key*/
    String value() default "default";

    /** 持锁时间,单位毫秒*/
    long keepMills() default 30000;

    /** 当获取失败时候动作*/
    LockFailAction action() default LockFailAction.CONTINUE;

    public enum LockFailAction{
        /** 放弃 */
        GIVEUP,
        /** 继续 */
        CONTINUE;
    }

    /** 重试的间隔时间,设置GIVEUP忽略此项*/
    long sleepMills() default 200;

    /** 重试次数*/
    int retryTimes() default 5;
}

装配分布式锁的bean

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;

import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock;
import com.itopener.lock.redis.spring.boot.autoconfigure.lock.RedisDistributedLock;

/**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:31
 * @version 1.0.0
 */
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class DistributedLockAutoConfiguration {

    @Bean
    @ConditionalOnBean(RedisTemplate.class)
    public DistributedLock redisDistributedLock(RedisTemplate<Object, Object> redisTemplate){
        return new RedisDistributedLock(redisTemplate);
    }

}

定义切面(spring boot配置方式)

import java.lang.reflect.Method;
import java.util.Arrays;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock;
import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock.LockFailAction;
import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock;

/**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:22
 * @version 1.0.0
 */
@Aspect
@Configuration
@ConditionalOnClass(DistributedLock.class)
@AutoConfigureAfter(DistributedLockAutoConfiguration.class)
public class DistributedLockAspectConfiguration {

    private final Logger logger = LoggerFactory.getLogger(DistributedLockAspectConfiguration.class);

    @Autowired
    private DistributedLock distributedLock;

    @Pointcut("@annotation(com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock)")
    private void lockPoint(){

    }

    @Around("lockPoint()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable{
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String key = redisLock.value();
        if(StringUtils.isEmpty(key)){
            Object[] args = pjp.getArgs();
            key = Arrays.toString(args);
        }
        int retryTimes = redisLock.action().equals(LockFailAction.CONTINUE) ? redisLock.retryTimes() : 0;
        boolean lock = distributedLock.lock(key, redisLock.keepMills(), retryTimes, redisLock.sleepMills());
        if(!lock) {
            logger.debug("get lock failed : " + key);
            return null;
        }

        //得到锁,执行方法,释放锁
        logger.debug("get lock success : " + key);
        try {
            return pjp.proceed();
        } catch (Exception e) {
            logger.error("execute locked method occured an exception", e);
        } finally {
            boolean releaseResult = distributedLock.releaseLock(key);
            logger.debug("release lock : " + key + (releaseResult ? " success" : " failed"));
        }
        return null;
    }
}

spring boot starter还需要在 resources/META-INF 中添加 spring.factories 文件

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAutoConfiguration,
com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAspectConfiguration

这样封装之后,使用spring boot开发的项目,直接依赖这个starter,就可以在方法上加 RedisLock 注解来实现分布式锁的功能了,当然如果需要自己控制,直接注入分布式锁的bean即可

@Autowired
private DistributedLock distributedLock;

如果需要使用其他的分布式锁实现,继承 AbstractDistributedLock 后实现获取锁和释放锁的方法即可

TDDL 基础

TDDL 基础

分布式数据库演化

  在正式学习 TDDL 之前应该先做一些知识准备,了解一下 TDDL 产生的背景以及解决了哪些问题。TDDL 是和分布式数据库相关的,首先来看一下分布式数据库的演化过程。

  • 单库单表
    通常刚开始的时候,应用的数据比较少,也不会很复杂,所以应用只有一个数据库,数据库中的表是一张完整的表,这也是我们刚开始接触数据库时的数据库形态。
  • 读写分离
    随着业务的发展,数据量与数据访问量不断增长,很多时候应用的主要业务是读多写少的,比如说一些新闻网站,运营在后台上传了一堆新闻之后,所有的用户都会去读取这些新闻资讯,因此数据库面临的读压力远大于写压力,那么这时候在原来数据库 Master 的基础上增加一个备用数据库 Slave,备库和主库存储着相同的数据,但只提供读服务,不提供写服务。以后的写操作以及事务中的读操作就走主库,其它读操作就走备库,这就是所谓的读写分离。
    读写分离会直接带来两个问题:
    • 数据复制问题
      因为最新写入的数据只会存储在主库中,之后想要在备库中读取到新数据就必须要从主库复制过来,这会带来一定的延迟,造成短期的数据不一致性。但这个问题应该也没有什么特别好的办法,主要依赖于数据库提供的数据复制机制,常用的是根据数据库日志 binary-log 实现数据复制。
    • 数据源选择问题
      读写分离之后我们都知道写要找主库,读要找备库,但是程序不知道,所以我们在程序中应该根据 SQL 来判断出是读操作还是写操作,进而正确选择要访问的数据库。
  • 垂直分库
    数据量与访问量继续上升时,主备库的压力都在变大,这时候可以根据业务特点考虑将数据库垂直拆分,即把数据库中不同的业务单元的数据划分到不同的数据库里面。比如说,还是新闻网站,注册用户的信息与新闻是没有多大关系的,数据库访问压力大时可以尝试把用户信息相关的表放在一个数据库,新闻相关的表放在一个数据库中,这样大大减小了数据库的访问压力。
    垂直分库会带来以下问题:
    • ACID 被打破
      数据分到不同的数据库之后,原来的事务操作将会受很大影响,比如说注册账户的时候需要在用户表和用户信息表中插入一条数据,单机数据库利用事务可以很好地完成这个任务,但是多机将会变得比较麻烦。以下两点也是因为数据库在不同的机子上而导致简单的操作变得复杂,不一一分析了。
    • Join 操作困难
    • 外键约束受影响
  • 水平分表
    经过长时间积累,特别对于 ugc 的业务,数据量会达到惊人的地步,每张表存放着大量的数据,任何 CRUD 都变成了一次极其消耗性能的操作,这个时候就会考虑水平分表,把一张表内大量的数据拆分成多张子表,比如说原来一张表中存放 50000 条数据,水平分成 5 张表之后,每张表只要存放 10000 条数据。这种结构可以比较容易的存储和维护海量数据。
    水平分表会在垂直分库的基础上带来更多的影响:
    • 自增主键会有影响
      这个影响很明显,分表中如果使用的是自增主键的话,那么就不能产生唯一的 ID 了,因为逻辑上来说多个分表其实都属于一张表,自增主键无法标识每一条数据。
    • 有些单表查询会变成多表
      比如说 count 操作,原来是一张表的问题,现在要从多张分表中共同查询才能得到结果。

  可以看出,数据库从单机走向分布式将会面临很多很多的问题,TDDL 就是为了解决这些问题而生的。

TDDL 基本结构

  • TDDL 基本结构
    如上所述,经过读写分离以及分库分表之后,数据库的结构变得比较复杂,在逻辑上大概是这样的:

TDDL 为了解决每一层所要应对的问题,它在结构上至上而下也分了三层,分别是 Matrix 层、Group层以及 Atom 层。具体来说,从逻辑上来看,最顶层是要进行分库分表才过渡到中间层的,分库分表所带来的问题是在 Matrix 层解决;中间层是经过读写分离和主备切换才会出现最底层,因此读写分离与主备切换的工作由 Group 层解决;至于 Atom 层,它面对的是实实在在的每一个数据库,更多的工作在与对数据库的连接管理,比如说当数据库的 IP 地址发生改变时,Atom 层要动态感知,以免连接找不到地址。(这段写得好乱,先凑合着看)

  • Matrix 层
    如上所述,Matrix 层可以解决分库分表带来的问题,从本质上来看,分库分表带来的最直接的影响是数据访问的路由。单库单表的时候,什么都不用想,就是去这个 DB 中找到这张 Table 再进行查询,但是多库多表的时候就必须要考虑数据存到哪个数据库,为什么要存到这个数据库诸如此类的问题。这里面涉及到数据访问路由算法,它规定了数据的存储位置,同样也由它来指明该去哪里查询数据,常见的数据访问路由算法有以下几种:
    • 固定哈希算法
      固定哈希就再简单不过了,就是根据某个字段(如整形的 id 或者字符串的 hashcode)对分库的数量或者分表的数量进行取模,根据余数路由到对应的位置。下面图中的例子,数据库垂直拆分成 4 个,其中有一张表水平拆分成两张,利用固定哈希算法进行路由的过程如下:
  • 一致性哈希算法
    固定哈希算法足够简单实用,基本能保证数据均匀分布,它也是 TDDL 的默认路由算法,但是在数据库扩容的时候,固定哈希算法带来的数据迁移成本也是不得不考虑的。依然是上面的例子,数据库拆分成 4 个,当需要增加数据库的时候,假设变成 5 个,由于取模的结果发生变化,原来数据库中的绝大部分数据都要进行迁移,只有在数据库倍增的时候,数据迁移量才是最少的,但也高达 50%,而且倍增的成本比较高。
    所以某年某月某某某就提出了另外一个算法,一致性哈希算法,它的原理就是通过该算法计算出 key 的 hashcode 之后对 2^32 取模,那么数据都会落在 0~2^32 所组成的环中(不要问我为什么是 2 的 32 次方,一看这个数字就知道里面有很多故事);同样的,可以利用一致性哈希算法对机器的唯一属性计算所在位置,然后数据存储在顺时针方向最近的机器上。如图所示:

对于一致性哈希,增删机器的成本就降低很多了,比如说在上图 node2 与 node4 之间增加一台机器 node5,那么需要迁移的数据只分布在 node2 与 node5 之间,相比固定哈希来说迁移量小了很多。

  • 虚拟节点
    一致性哈希已经可以解决大部分需求了,但是对于数据集中在热点的情况,一致性哈希同样面临比较大的挑战。比如说,上图的 node2 与 node4 之间集中了整个环中的大部分数据,当加入 node5 之后,其实起到的效果比较有限,因为还是要有大量的数据进行迁移。
    引入虚拟节点之后,情况就不一样了,所谓虚拟节点,它就是物理节点的映射,一个物理节点可以复制出多个虚拟节点,尽可能的让它均匀分布在环上,那么即使数据再集中,其实也会存储在不同的节点上,很好地起到了负载均衡的作用。
  • 自定义路由规则
    这是最不常用的方法,不过 TDDL 也支持,你可以实现自己的算法进行数据访问路由,但一般要么效果很差要么成本很高。

  Matrix 层除了要解决数据访问路由问题之外,还需要顺带提供其他的功能。比如说,因为路由之前要判断出 SQL 语句是读操作还是写操作,所以 Matrix 层需要解析 SQL,另外还要优化,计算等等,其实都是围绕着数据访问路由这个功能展开的,内容比较多,一时半会还学不来。

  • Group 层
    读写分离与主备切换带来的问题由 Group 层解决。首先简单介绍一下主备切换,由于主库或者备库都有可能挂掉,但很小概率同时挂,所以当一方挂掉的时候,Group 层要马上把流量切到另一个库,保证挂掉一个不会让应用挂掉。
    读写分离最大的问题是数据复制,通常有两种复制场景,一种是镜像复制,即主库和从库的数据结构是一模一样的,通常根据主库上的日志变化,在从库中执行相同的操作;另外一种是非对称复制,意思就是主库与备库是以不同的方式分库的,它们的结构虽然相同,但是主备库中存储的记录是不相同的,主要目的是查询条件不同时,把请求分发到更加适合的库去操作。举个例子,对于订单数据库,买家会根据自己的 ID 去查自己的交易记录,所以主库可以用买家 ID 分库,保证单个买家的记录在同一个数据库中;但是卖家如果想看交易记录的话可能就得从多个库中进行查询,这时候可以利用卖家 ID 进行分库作为备库,这样一来主备库的复制就不能简单的镜像复制了,在进行复制操作之前还需要进行路由。在 TDDL 中,数据复制使用了中间件愚公,真是个好名字。
  • Atom层
    Atom 层主要负责连接管理以及结合中间件精卫进行数据迁移,先记下来,具体细节不清楚。

总结

  本文主要从 TDDL 产生的背景与系统结构方面进行了简单的介绍,算是对 TDDL 的一个入门学习,知道 TDDL 是做什么的,但是目前依然欠缺实战经验,也没有足够的时间去深入学习原理,所以文章写得比较粗浅,难免有理解错或表达不清的地方。

打包 FatJar 方法小结

什么是 FatJar

FatJar 又称作 uber-Jar,是包含所有依赖的 Jar 包。Jar 包中嵌入了除 java 虚拟机以外的所有依赖。我们知道 Java 的依赖分为两种, 零散的 .class 文件和把多个 .class 文件以 zip 格式打包而成 jar 文件。FatJar 是一个 all-in-one Jar 包。FatJar 技术可以让那些用于最终发布的 Jar 便于部署和运行。

三种打包方法

我们知道 .java 源码文件会被编译器编译成字节码.class 文件。Java 虚拟机执行的是 .class 文件。一个 java 程序可以有很多个 .class文件。这些 .class 文件可以由 java 虚拟机的类装载器运行期装载到内存里。java 虚拟机可以从某个目录装载所有的 .class 文件,但是这些零散的.class 文件并不便于分发。所有 java 支持把零散的.class 文件打包成 zip 格式的 .jar 文件,并且虚拟机的类装载器支持直接装载 .jar 文件。

一个正常的 java 程序会有若干个.class 文件和所依赖的第三方库的 jar 文件组成。

1. 非遮蔽方法(Unshaded)

非遮蔽是相对于遮蔽而说的,可以理解为一种朴素的办法。解压所有 jar 文件,再重新打包成一个新的单独的 jar 文件。

借助 Maven Assembly Plugin 都可以轻松实现非遮蔽方法的打包。

Maven Assembly Plugin

Maven Assembly Plugin 是一个打包聚合插件,其主要功能是把项目的编译输出协同依赖,模块,文档和其他文件打包成一个独立的发布包。使用描述符(descriptor)来配置需要打包的物料组合。并预定义了常用的描述符,可供直接使用。

预定义描述符如下

  • bin 只打包编译结果,并包含 README, LICENSE 和 NOTICE 文件,输出文件格式为 tar.gz, tar.bz2 和 zip。
  • jar-with-dependencies 打包编译结果,并带上所有的依赖,如果依赖的是 jar 包,jar 包会被解压开,平铺到最终的 uber-jar 里去。输出格式为 jar。
  • src 打包源码文件。输出格式为 tar.gz, tar.bz2 和 zip。
  • project 打包整个项目,除了部署输出目录 target 以外的所有文件和目录都会被打包。输出格式为 tar.gz, tar.bz2 和 zip。

除了预定义的描述符,用户也可以指定描述符,以满足不同的打包需求。

打包成 uber-jar,需要使用预定义的 jar-with-dependencies 描述符:

在 pom.xml 中加入如下配置

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-assembly-plugin</artifactId>

<version>CHOOSE LATEST VERSION HERE</version>

<configuration>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

</configuration>

<executions>

<execution>

<id>assemble-all</id>

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

Gradle Java plugin

gradle 下打包一个非遮蔽的 jar 包,有不少插件可以用,但是由于 gradle 自身的灵活性,可以直接用 groove 的 dsl 实现。

apply plugin: ‘java’

jar {

from {

(configurations.runtime).collect {

it.isDirectory() ? it : zipTree(it)

}

}

}

非遮蔽方法会把所有的 jar 包里的文件都解压到一个目录里,然后在打包同一个 fatjar 中。对于复杂应用很可能会碰到同名类相互覆盖问题。

2. 遮蔽方法(Shaded)

遮蔽方法会把依赖包里的类路径进行修改到某个子路径下,这样可以一定程度上避免同名类相互覆盖的问题。最终发布的 jar 也不会带入传递依赖冲突问题给下游。

Maven Shade Plugin

在 pom.xml 中加入如下配置

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-shade-plugin</artifactId>

<version>3.1.1</version>

<configuration>

<!– put your configurations here –>

</configuration>

<executions>

<execution>

<phase>package</phase>

<goals>

<goal>shade</goal>

</goals>

</execution>

</executions>

</plugin>

Gradle Shadow plugin

Gradle shadow plugin 使用非常简单,简单声明插件后就可以生效。

plugins {

id ‘com.github.johnrengelman.shadow’ version ‘2.0.4’

id ‘java’

}

shadowJar {

include ‘*.jar’

include ‘*.properties’

exclude ‘a2.properties’

}

遮蔽方法依赖修改 class 的字节码,更新依赖文件的包路径达到规避同名同包类冲突的问题,但是改名也会带来其他问题,比如代码中使用 Class.forName 或 ClassLoader.loadClass 装载的类,Shade Plugin 是感知不到的。同名文件覆盖问题也没法杜绝,比如META-INF/services/javax.script.ScriptEngineFactory不属于类文件,但是被覆盖后会出现问题。

3. 嵌套方法(Jar of Jars)

还是一种办法就是在 jar 包里嵌套其他 jar,这个方法可以彻底避免解压同名覆盖的问题,但是这个方法不被 JVM 原生支持,因为 JDK 提供的 ClassLoader 仅支持装载嵌套 jar 包的 class 文件。所以这种方法需要自定义 ClassLoader 以支持嵌套 jar。

Onejar Maven Plugin

One-JAR 就是一个基于上面嵌套 jar 实现的工具。onejar-maven-plugin 是社区基于 onejar 实现的 maven 插件。

<plugin>

<groupId>com.jolira</groupId>

<artifactId>onejar-maven-plugin</artifactId>

<version>1.4.4</version>

<executions>

<execution>

<goals>

<goal>one-jar</goal>

</goals>

</execution>

</executions>

</plugin>

Spring boot plugin

One-JAR 有点年久失修,好久没有维护了,Spring Boot 提供的 Maven Plugin 也可以打包 Fatjar,支持非遮蔽和嵌套的混合模式,并且支持 maven 和 gradle 。

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

<configuration>

<layout>ZIP</layout>

<requiresUnpack>

<dependency>

<groupId>org.jruby</groupId>

<artifactId>jruby-complete</artifactId>

</dependency>

</requiresUnpack>

</configuration>

</plugin>

plugins {

id ‘org.springframework.boot’ version ‘2.0.4.RELEASE’

}

bootJar {

requiresUnpack ‘**/jruby-complete-*.jar’

}

requiresUnpack 参数可以定制那些 jar 不希望被解压,采用嵌套的方式打包到 Fatjar 内部。

其打包后的内部结构为

example.jar

|

+-META-INF

| +-MANIFEST.MF

+-org

| +-springframework

| +-boot

| +-loader

| +-<spring boot loader classes>

+-BOOT-INF

+-classes

| +-mycompany

| +-project

| +-YourClasses.class

+-lib

+-dependency1.jar

+-dependency2.jar

应用的类文件被放置到 BOOT-INF/classes 目录,依赖包被放置到 BOOT-INF/lib 目录。

查看 META-INF/MANIFEST.MF 文件,其内容为

Main-Class: org.springframework.boot.loader.JarLauncher

Start-Class: com.mycompany.project.MyApplication

启动类是固定的 org.springframework.boot.loader.JarLauncher,应用程序的入口类需要配置成 Start-Class。这样做的目的主要是为了支持嵌套 jar 包的类装载,替换掉默认的 ClassLoader。

但是函数计算 Java Runtime 需要的 jar 包是一种打包结构,在服务端运行时会解压开,./lib 目录加到 classpath 中,但不会调用 Main-Class。所以自定义 ClassLoader 是不生效的,所以不要使用嵌套 jar 结构,除非在入口函数指定重新定义 ClassLoader 或者 classpath 以支持 BOOT-INF/classes 和 BOOT-INF/lib 这样的定制化的类路径。

小结

单从 Fatjar 的角度看, Spring boot maven/gradle 做得最精致。但是 jar 包内部的自定义路径解压开以后和函数计算是不兼容的。所以如果用于函数计算打包,建议使用 Unshaded 或者 Shared 的打包方式,但是需要自己注意文件覆盖问题。

Spring MVC学习笔记

1.Spring MVC流程图

2.注解驱动控制器
请求映射到控制器可以通过URL,请求参数,请求方法,请求头四个方面信息项;

@RequestMapping支持Ant风格(?,*)和{XX}占位符的URL;
请求方法参数:@RequstParam,@CookieValue,@RequetHeader,Servlet API对象:HttpServletRequet,HttpServletResponse,I/O对象:OutputStream,以及java.util.Locale,java.security.Principal;

使用HttpMessageConverter<T>:DispatchServlet默认已经安装了RequestMappingHandlerAdapter作为HandlerAdapter的组件实现类,HttpMessageConverter即有RequestMappingHandlerAdapter使用,将请求信息转换为对象,或将对象转换为相应信息;

处理模型的数据:ModelAndView,@ModelAttribute,Map或Model,@SessionAttribute;
SpringMVC在调用方法前会创建一个隐藏的模型对象,如果处理方法的入参为Map或Model类型,则SpringMVC会将隐含模型的引用传递给这些参数,同时也可以添加新的属性数据;

3.处理方法的数据绑定
SpringMVC根据请求方法签名不同,将请求参数中的信息以一定的方式转换并绑定到请求方法的入参中,并进行数据转换,数据格式化,数据校验等;
3.1 流程解析

3.2 ConversionService
ConversionService是Spring类型转换体系的核心接口,位于org.springframework.core.convert包中,也是该包中唯一个接口。主要定义了四个方法:

使用FactoryBean产生;

Elasticsearch集群配置详解

前言

在我们es系列文章开篇介绍中,已经提到,elasticsearch是天生支持集群的,他不需要依赖其他的服务发现和注册的组件,如zookeeper这些,因为他内置了一个名字叫ZenDiscovery的模块,是elasticsearch自己实现的一套用于节点发现和选主等功能的组件,所以elasticsearch做起集群来非常简单,不需要太多额外的配置和安装额外的第三方组件。

数据分片

我们在前面的篇章中介绍过elasticsearch的一些重要的基本概念了,但是为了循序渐进的学习elasticsearch,我们并没有把所有概念都一次性列出来,而是在学习的过程中不断的补充,那现在我们需要学习elasticsearch的集群了,就需要补充一个基本概念,叫做“分片(shard)”,分片在集群中起到了非常关键的作用,那接下来,我们来看看分片是什么。

分片的概念和作用

分片是一个底层的工作单元 ,它仅保存了全部数据中的一部分,一个分片是一个 Lucene 的实例,它本身就是一个完整的搜索引擎,我们的文档被存储和索引到分片内。

每个分片又有一个副本分片,副本分片就是主分片的copy,对于分布式搜索引擎来说, 分片及副本的分配将是高可用及快速搜索响应的设计核心.主分片与副本都能处理查询请求,它们的唯一区别在于只有主分片才能处理更新请求,这就有点像我们常见的“主从”的概念了。

Elasticsearch 是利用分片将数据分发到集群内各处的。分片是数据的容器,文档保存在分片内,分片又被分配到集群内的各个节点里。当你的集群规模扩大或者缩小时, Elasticsearch 会自动的在各节点中迁移分片,使得数据仍然均匀分布在集群里。所以集群和分片关系非常密切,学习集群之前,需要了解分片的概念和作用。

ES集群节点类型

通过前面对elasticsearch的基本概念了解到,在一个elasticsearch集群(cluster)中,有多个节点(node),并且这些节点有不同的类型:我们部署一个elasticsearch集群常用的节点类型有:主节点,数据节点,候选主节点,客户端节点。

这些节点的类型是通过在elasticsearch主配置文件path/to/config/elasticsearch.yml文件指定的,配置属性如下:

node.master: true/false #该节点有机会成为master节点
node.data: true/false #该节点可以存储数据

通过以上两行配置的组合,我们可以指定出elasticsearch的节点类型。

每种节点的作用如下:

1、主节点:

主节点负责创建索引、删除索引、分配分片、追踪集群中的节点状态等工作。Elasticsearch中的主节点的工作量相对较轻,用户的请求可以发往集群中任何一个节点,由该节点负责分发和返回结果,而不需要经过主节点转发。而主节点是由候选主节点通过ZenDiscovery机制选举出来的,所以要想成为主节点,首先要先成为候选主节点。

2、候选主节点

在elasticsearch集群初始化或者主节点宕机的情况下,由候选主节点中选举其中一个作为主节点。指定候选主节点的配置为:node.master: true。

但是在elasticsearch集群中,会有偶尔出现这样的一种现象,就是当主节点负载压力过大,或者集中环境中的网络问题,导致其他节点与主节点通讯的时候,主节点没来的及响应,这样的话,某些节点就认为主节点宕机,重新选择新的主节点,这样的话整个集群的工作就有问题了,比如我们集群中有10个节点,其中7个候选主节点,1个候选主节点成为了主节点,这种情况是正常的情况。但是如果现在出现了我们上面所说的主节点响应不及时,导致其他某些节点认为主节点宕机而重选主节点,那就有问题了,这剩下的6个候选主节点可能有3个候选主节点去重选主节点,最后集群中就出现了两个主节点的情况,这种情况官方成为“脑裂现象”。

如果避免这种问题呢?主要从以下几个方面来避免:

(1)尽量不要让候选主节点同时作为数据节点,因为数据节点是需要承担存储和搜索的工作的,压力会很大。所以如果该节点同时作为候选主节点和数据节点,那么一旦选上它作为主节点了,这时主节点的工作压力将会非常大,出现脑裂现象的概率就增加了。

(2)配置主节点的响应时间,在默认情况下,主节点3秒没有响应,其他节点就认为主节点宕机了,那我们可以把该时间设置的长一点,该配置是:

discovery.zen.ping_timeout: 3

(3)配置候选主节点最小数量,从脑裂现象出现的原因中,我们看到了,如果集群中有部分候选主节点重新选择主节点,那集群中的候选主节点就会被分成两部分,导致集群的可用节点个数和实际的节点个数不一致,那这样的话,我们就可用通过配置的方式,指定如果候选主节点个数达到某个值时,才能进行主节点选举,而该配置属性如下:

discovery.zen.minimum_master_nodes

该属性默认值是1,官方的推荐值是(N/2)+1,其中N是候选主节点的数量,那这样的话,比如我们集群有7个候选主节点,那么通过官方推荐值,我们应该设置为4,这样的话,至少有4个候选主节点都认为需要重选主节点的情况下才进行选举。

3、数据节点

数据节点负责数据的存储和相关具体操作,比如CRUD、搜索、聚合。所以,数据节点对机器配置要求比较高,首先需要有足够的磁盘空间来存储数据,其次数据操作对系统CPU、Memory和IO的性能消耗都很大。通常随着集群的扩大,需要增加更多的数据节点来提高可用性。指定数据节点的配置:

node.data: true。

elasticsearch是允许一个节点既做候选主节点也做数据节点的,但是数据节点的负载较重,所以需要考虑将二者分离开,设置专用的候选主节点和数据节点,避免因数据节点负载重导致主节点不响应。

4、客户端节点

按照官方的介绍,客户端节点就是既不做候选主节点也不做数据节点的节点,只负责请求的分发、汇总等等,但是这样的工作,其实任何一个节点都可以完成,因为在elasticsearch中一个集群内的节点都可以执行任何请求,其会负责将请求转发给对应的节点进行处理。所以单独增加这样的节点更多是为了负载均衡。指定该节点的配置为:

node.master: false 
node.data: false
Elasticsearch集群配置详解

集群配置

接下来,我们做一个集群测试一下,我这里使用4个节点做测试,分别为:

es1 192.168.85.133:9300
es2 192,168.85.133:9500
es3 192.168.85.135:9300
es4 192.168.85.135:9500

1、es1 既是候选主节点也是数据节点,elasticsearch.yml配置如下:

cluster.name: elasticsearch #集群的名称,同一个集群该值必须设置成相同的
node.name: es1 #该节点的名字
node.master: true #该节点有机会成为master节点
node.data: true #该节点可以存储数据
network.bind_host: 0.0.0.0 #设置绑定的IP地址,可以是IPV4或者IPV6
network.publish_host: 192.168.85.133 #设置其他节点与该节点交互的IP地址
network.host: 192.168.85.133 #该参数用于同时设置bind_host和publish_host
transport.tcp.port: 9300 #设置节点之间交互的端口号
transport.tcp.compress: true #设置是否压缩tcp上交互传输的数据
http.port: 9200 #设置对外服务的http端口号
http.max_content_length: 100mb #设置http内容的最大大小
http.enabled: true #是否开启http服务对外提供服务
discovery.zen.minimum_master_nodes: 2 #设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。官方推荐(N/2)+1
discovery.zen.ping_timeout: 120s #设置集群中自动发现其他节点时ping连接的超时时间
discovery.zen.ping.unicast.hosts: ["192.168.85.133:9300","192.168.85.133:9500","192.168.85.135:9300"] #设置集群中的Master节点的初始列表,可以通过这些节点来自动发现其他新加入集群的节点
http.cors.enabled: true #跨域连接相关设置
http.cors.allow-origin: "*" #跨域连接相关设置

2、es2 既是候选主节点也是数据节点,elasticsearch.yml配置如下:

cluster.name: elasticsearch #集群的名称,同一个集群该值必须设置成相同的
node.name: es2 #该节点的名字
node.master: true #该节点有机会成为master节点
node.data: true #该节点可以存储数据
network.bind_host: 0.0.0.0 #设置绑定的IP地址,可以是IPV4或者IPV6
network.publish_host: 192.168.85.133 #设置其他节点与该节点交互的IP地址
network.host: 192.168.85.133 #该参数用于同时设置bind_host和publish_host
transport.tcp.port: 9500 #设置节点之间交互的端口号
transport.tcp.compress: true #设置是否压缩tcp上交互传输的数据
http.port: 9400 #设置对外服务的http端口号
http.max_content_length: 100mb #设置http内容的最大大小
http.enabled: true #是否开启http服务对外提供服务
discovery.zen.minimum_master_nodes: 2 #设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。官方推荐(N/2)+1
discovery.zen.ping_timeout: 120s #设置集群中自动发现其他节点时ping连接的超时时间
discovery.zen.ping.unicast.hosts: ["192.168.85.133:9300","192.168.85.133:9500","192.168.85.135:9300"] #设置集群中的Master节点的初始列表,可以通过这些节点来自动发现其他新加入集群的节点
http.cors.enabled: true #跨域连接相关设置
http.cors.allow-origin: "*" #跨域连接相关设置

3、es3 是候选主节点,elasticsearch.yml配置如下:

cluster.name: elasticsearch #集群的名称,同一个集群该值必须设置成相同的
node.name: es3 #该节点的名字
node.master: true #该节点有机会成为master节点
node.data: false #该节点可以存储数据
network.bind_host: 0.0.0.0 #设置绑定的IP地址,可以是IPV4或者IPV6
network.publish_host: 192.168.85.135 #设置其他节点与该节点交互的IP地址
network.host: 192.168.85.135 #该参数用于同时设置bind_host和publish_host
transport.tcp.port: 9300 #设置节点之间交互的端口号
transport.tcp.compress: true #设置是否压缩tcp上交互传输的数据
http.port: 9200 #设置对外服务的http端口号
http.max_content_length: 100mb #设置http内容的最大大小
http.enabled: true #是否开启http服务对外提供服务
discovery.zen.minimum_master_nodes: 2 #设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。官方推荐(N/2)+1
discovery.zen.ping_timeout: 120s #设置集群中自动发现其他节点时ping连接的超时时间
discovery.zen.ping.unicast.hosts: ["192.168.85.133:9300","192.168.85.133:9500","192.168.85.135:9300"] #设置集群中的Master节点的初始列表,可以通过这些节点来自动发现其他新加入集群的节点
http.cors.enabled: true #跨域连接相关设置
http.cors.allow-origin: "*" #跨域连接相关设置

4、es4 是数据节点,elasticsearch.yml配置如下:

cluster.name: elasticsearch #集群的名称,同一个集群该值必须设置成相同的
node.name: es4 #该节点的名字
node.master: false #该节点有机会成为master节点
node.data: true #该节点可以存储数据
network.bind_host: 0.0.0.0 #设置绑定的IP地址,可以是IPV4或者IPV6
network.publish_host: 192.168.85.135 #设置其他节点与该节点交互的IP地址
network.host: 192.168.85.135 #该参数用于同时设置bind_host和publish_host
transport.tcp.port: 9500 #设置节点之间交互的端口号
transport.tcp.compress: true #设置是否压缩tcp上交互传输的数据
http.port: 9400 #设置对外服务的http端口号
http.max_content_length: 100mb #设置http内容的最大大小
http.enabled: true #是否开启http服务对外提供服务
discovery.zen.minimum_master_nodes: 2 #设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。官方推荐(N/2)+1
discovery.zen.ping_timeout: 120s #设置集群中自动发现其他节点时ping连接的超时时间
discovery.zen.ping.unicast.hosts: ["192.168.85.133:9300","192.168.85.133:9500","192.168.85.135:9300"] #设置集群中的Master节点的初始列表,可以通过这些节点来自动发现其他新加入集群的节点
http.cors.enabled: true #跨域连接相关设置
http.cors.allow-origin: "*" #跨域连接相关设置

以上配置就是我们集群中4个elasticsearch节点的配置,我们没有两个既是候选主节点也是数据节点,一个候选主节点,一个数据节点,没有配置客户端节点,因为客户端节点仅起到负载均衡的作用。

验证群状态

查看集群状态的REST API

我们逐一的把集群中的节点启动起来,然后使用REST API查看集群状态,访问集群中任意一个节点即可:

GET ip:port/_cluster/health

响应的结果:

{
"cluster_name" : "elasticsearch",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 4,
"number_of_data_nodes" : 3,
"active_primary_shards" : 46,
"active_shards" : 92,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}

我们来看看返回结果中的几个关键的属性:

  • 集群状态有3种(status):
  • red,表示有主分片没有分配,某些数据不可用。
  • yellow,表示主分片都已分配,数据都可用,但是有复制分片没有分配。
  • green,表示主分片和复制分片都已分配,一切正常。
  • 那么通过以上返回的结果中的status属性,我们可以知道,当前的elasticsearch集群是正常的。
  • number_of_nodes属性说明我们集群中有4个节点。
  • number_of_data_nodes说明我们集群中有3个为数据节点,所有数据分片储存在这3个数据节点中。
  • active_primary_shards属性说明主分片有46个(我们有10个索引,其中9个索引有5个主分片,1个索引有1个主分片,合计46个主分片)
  • active_shards属性说明所有分片(主分片 + 副本分片)总共是92个

使用head插件管理ES集群

我们在之前安装了es-head插件的时候提到,es-head插件可以管理我们elasticsearch集群的,那现在我们就使用之前安装好的es-head插件来看看elasticsearch集群如果做到图形化界面的管理。

打开首页我们可以看到以下界面:

Elasticsearch集群配置详解
  • 连接任意一个节点看到的结果都是一样的,这个就验证了我们上面提到的概念:用户的请求可以发往集群中任何一个节点,由该节点负责请求分发和返回结果。在该界面中,是以表格的形式展示数据的,纵向的是集群中的索引,横向的是集群中的节点。
  • 横向有4行,分别是4个我们自己命名的节点,其中标记了es1是星号,代表它为主节点。
  • 然后我们可以看到,每个单元格中都有一些绿色的小方块,小方块中标有数字,这些绿色小方块就代表分片,其中边框加粗的是主分片,没加粗的是副本分片。
  • 我们在创建索引时,如果没有指定分片的数量,那么默认是5个,再加上每个分片都有一个副本,所以加起来就有10个绿色小方块了。
  • 每个绿色小方块上都有0-4的数字,分别代表0-4号分片,和对应的0-4号分片的副本分片,我们点击绿色小方块,会弹出一个窗口,列出该分片的一些信息,其中有条信息是“primary”,如果为true,就代表主分片,发false就代表副本分片。
  • 其中es3节点是没有给它分配分片的,那是因为当时我们给该节点设置的是候选主节点,不是数据节点,所以他不承担数据存储的工作。
  • 这10个分片(包含主分片和副本分片)被elasticsearch分配到各个数据节点中,并且根据我们上面对分片概念的了解,主分片和副本分片都可以接收数据查询请求,但是只有主分片才能接收数据修改请求,副本分片的数据是从主分片同步过来的。

测试数据

最后,我们从用户的角度上来看看,向集群中各个节点发送一个查询请求,测试数据的一致性。我们先往集群中新增一条数据,以store索引作为测试,向任意一个节点发送一个新增文档的请求:

PUT /store/employee/3
{
"name":"王五",
"age":28,
"about":"我叫王五,但不是隔壁老王",
"interests":["code","music"]
}

看到以下返回结果提示添加成功:

Elasticsearch集群配置详解

但这里有一个问题,就是该文档会被存储到哪一个主分片中呢?首先这肯定不会是随机的,否则将来要获取文档的时候我们就不知道从何处寻找了。实际上,这个过程是根据下面这个公式决定的:

shard = hash(routing) % number_of_primary_shards

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到 余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求的文档所在分片的位置。

这就顺便解释了为什么我们要在创建索引的时候就确定好主分片的数量,并且永远不会改变这个数量,因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。

那接下来我们看看每个节点是否可以查询到该数据:

Elasticsearch集群配置详解
Elasticsearch集群配置详解
Elasticsearch集群配置详解
Elasticsearch集群配置详解

这4个节点都是可以查询到刚刚我们新增的文档id为3的数据,集群正常运作。在我们整个集群配置的过程中,需要我们配置的东西很少,所以elasticsearch真的可以很轻松的完成一个规模超大的集群,并且在该集群中存储海量的数据。

高性能 Java 缓存库—Caffeine

1、介绍

在本文中,我们来看看 Caffeine — 一个高性能的 Java 缓存库

缓存和 Map 之间的一个根本区别在于缓存可以回收存储的 item。

回收策略为在指定时间删除哪些对象。此策略直接影响缓存的命中率 — 缓存库的一个重要特征。

Caffeine 因使用 Window TinyLfu 回收策略,提供了一个近乎最佳的命中率

2、依赖

我们需要在 pom.xml 中添加 caffeine 依赖:

高性能 Java 缓存库—Caffeine

3、填充缓存

让我们来了解一下 Caffeine 的三种缓存填充策略:手动、同步加载和异步加载。

首先,我们为要缓存中存储的值类型写一个类:

高性能 Java 缓存库—Caffeine

3.1、手动填充

在此策略中,我们手动将值放入缓存之后再检索。

让我们初始化缓存:

高性能 Java 缓存库—Caffeine

现在,我们可以使用 getIfPresent 方法从缓存中获取一些值。 如果缓存中不存在此值,则此方法将返回 null:

高性能 Java 缓存库—Caffeine

我们可以使用 put 方法手动填充缓存:

高性能 Java 缓存库—Caffeine

我们也可以使用 get 方法获取值,该方法将一个参数为 key 的 Function 作为参数传入。如果缓存中不存在该键,则该函数将用于提供回退值,该值在计算后插入缓存中:

高性能 Java 缓存库—Caffeine

get 方法可以原子方式执行计算。这意味着您只进行一次计算 — 即使多个线程同时请求该值。这就是为什么使用 get 优于 getIfPresent

有时我们需要手动使一些缓存的值失效

高性能 Java 缓存库—Caffeine

3.2、同步加载

这种加载缓存的方法使用了与用于初始化值的 Function 相似的手动策略的 get 方法。让我们看看如何使用它。

首先,我们需要初始化缓存:

高性能 Java 缓存库—Caffeine

现在我们可以使用 get 方法检索值:

高性能 Java 缓存库—Caffeine

我们也可以使用 getAll 方法获取一组值:

高性能 Java 缓存库—Caffeine

从传递给 build 方法的底层后端初始化函数检索值。 这使得可以使用缓存作为访问值的主要门面(Facade)。

3.3、异步加载

此策略的作用与之前相同,但是以异步方式执行操作,并返回一个包含值的CompletableFuture

高性能 Java 缓存库—Caffeine

我们可以以相同的方式使用 getgetAll 方法,同时考虑到他们返回的是CompletableFuture

高性能 Java 缓存库—Caffeine

CompletableFuture 有许多有用的 API,您可以在此文中获取更多内容。

4、值回收

Caffeine 有三个值回收策略:基于大小,基于时间和参考。

4.1、基于大小的回收

这种回收方式假定当超过配置的缓存大小限制时会发生回收。 获取大小有两种方法:缓存中计数对象,或获取权重。

让我们看看如何计算缓存中的对象。当缓存初始化时,其大小等于零:

高性能 Java 缓存库—Caffeine

当我们添加一个值时,大小明显增加:

高性能 Java 缓存库—Caffeine

我们可以将第二个值添加到缓存中,这导致第一个值被删除:

高性能 Java 缓存库—Caffeine

值得一提的是,在获取缓存大小之前,我们调用了 cleanUp 方法。 这是因为缓存回收被异步执行,这种方法有助于等待回收的完成。

我们还可以传递一个 weigher Function 来获取缓存的大小:

高性能 Java 缓存库—Caffeine

当 weight 超过 10 时,值将从缓存中删除:

高性能 Java 缓存库—Caffeine

4.2、基于时间的回收

这种回收策略是基于条目的到期时间,有三种类型:

  • 访问后到期 — 从上次读或写发生后,条目即过期。
  • 写入后到期 — 从上次写入发生之后,条目即过期
  • 自定义策略 — 到期时间由 Expiry 实现独自计算

让我们使用 expireAfterAccess 方法配置访问后过期策略:

高性能 Java 缓存库—Caffeine

要配置写入后到期策略,我们使用 expireAfterWrite 方法:

高性能 Java 缓存库—Caffeine

要初始化自定义策略,我们需要实现 Expiry 接口:

高性能 Java 缓存库—Caffeine

4.3、基于引用的回收

我们可以将缓存配置为启用缓存键值的垃圾回收。为此,我们将 key 和 value 配置为 弱引用,并且我们可以仅配置软引用以进行垃圾回收。

当没有任何对对象的强引用时,使用 WeakRefence 可以启用对象的垃圾收回收。SoftReference 允许对象根据 JVM 的全局最近最少使用(Least-Recently-Used)的策略进行垃圾回收。有关 Java 引用的更多详细信息,请参见此处。

我们应该使用 Caffeine.weakKeys()Caffeine.weakValues()Caffeine.softValues() 来启用每个选项:

高性能 Java 缓存库—Caffeine

5、刷新

可以将缓存配置为在定义的时间段后自动刷新条目。让我们看看如何使用 refreshAfterWrite 方法:

高性能 Java 缓存库—Caffeine

这里我们应该要明白 expireAfterrefreshAfter 之间的区别。 当请求过期条目时,执行将发生阻塞,直到 build Function 计算出新值为止。

但是,如果条目可以刷新,则缓存将返回一个旧值,并异步重新加载该值

6、统计

Caffeine 有一种记录缓存使用情况的统计方式

高性能 Java 缓存库—Caffeine

7、结论

在本文中,我们熟悉了 Java 的 Caffeine 缓存库。 我们看到了如何配置和填充缓存,以及如何根据我们的需要选择适当的到期或刷新策略。

搞懂“分布式锁”

但是随着分布式的快速发展,本地的加锁往往不能满足我们的需要,在我们的分布式环境中上面加锁的方法就会失去作用。

于是人们为了在分布式环境中也能实现本地锁的效果,也是纷纷各出其招,今天让我们来聊一聊一般分布式锁实现的套路。

为何需要分布式锁

Martin Kleppmann 是英国剑桥大学的分布式系统的研究员,之前和 Redis 之父 Antirez 进行过关于 RedLock(红锁,后续有讲到)是否安全的激烈讨论。

Martin 认为一般我们使用分布式锁有两个场景:

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

分布式锁的一些特点

当我们确定了在不同节点上需要分布式锁,那么我们需要了解分布式锁到底应该有哪些特点?

分布式锁的特点如下:

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

常见的分布式锁

我们了解了一些特点之后,我们一般实现分布式锁有以下几个方式:

  • MySQL
  • ZK
  • Redis
  • 自研分布式锁:如谷歌的 Chubby。

下面分开介绍一下这些分布式锁的实现原理。

MySQL

首先来说一下 MySQL 分布式锁的实现原理,相对来说这个比较容易理解,毕竟数据库和我们开发人员在平时的开发中息息相关。

对于分布式锁我们可以创建一个锁表:

搞懂“分布式锁”,看这篇文章就对了

前面我们所说的 lock(),trylock(long timeout),trylock() 这几个方法可以用下面的伪代码实现。

lock()

lock 一般是阻塞式的获取锁,意思就是不获取到锁誓不罢休,那么我们可以写一个死循环来执行其操作:

搞懂“分布式锁”,看这篇文章就对了

mysqlLock.lcok 内部是一个 sql,为了达到可重入锁的效果,我们应该先进行查询,如果有值,需要比较 node_info 是否一致。

这里的 node_info 可以用机器 IP 和线程名字来表示,如果一致就加可重入锁 count 的值,如果不一致就返回 false。如果没有值就直接插入一条数据。

伪代码如下:

搞懂“分布式锁”,看这篇文章就对了

需要注意的是这一段代码需要加事务,必须要保证这一系列操作的原子性。

tryLock() 和 tryLock(long timeout)

tryLock() 是非阻塞获取锁,如果获取不到就会马上返回,代码如下:

搞懂“分布式锁”,看这篇文章就对了

tryLock(long timeout) 实现如下:

搞懂“分布式锁”,看这篇文章就对了

mysqlLock.lock 和上面一样,但是要注意的是 select … for update 这个是阻塞的获取行锁,如果同一个资源并发量较大还是有可能会退化成阻塞的获取锁。

unlock()

unlock 的话如果这里的 count 为 1 那么可以删除,如果大于 1 那么需要减去 1。

搞懂“分布式锁”,看这篇文章就对了

锁超时

我们有可能会遇到我们的机器节点挂了,那么这个锁就不会得到释放,我们可以启动一个定时任务,通过计算一般我们处理任务的时间。

比如是 5ms,那么我们可以稍微扩大一点,当这个锁超过 20ms 没有被释放我们就可以认定是节点挂了然后将其直接释放。

MySQL 小结:

  • 适用场景:MySQL 分布式锁一般适用于资源不存在数据库,如果数据库存在比如订单,可以直接对这条数据加行锁,不需要我们上面多的繁琐的步骤。

比如一个订单,我们可以用 select * from order_table where id = ‘xxx’ for update 进行加行锁,那么其他的事务就不能对其进行修改。

  • 优点:理解起来简单,不需要维护额外的第三方中间件(比如 Redis,ZK)。
  • 缺点:虽然容易理解但是实现起来较为繁琐,需要自己考虑锁超时,加事务等等。性能局限于数据库,一般对比缓存来说性能较低。对于高并发的场景并不是很适合。

乐观锁

前面我们介绍的都是悲观锁,这里想额外提一下乐观锁,在我们实际项目中也是经常实现乐观锁,因为我们加行锁的性能消耗比较大,通常我们对于一些竞争不是那么激烈。

但是其又需要保证我们并发的顺序执行使用乐观锁进行处理,我们可以对我们的表加一个版本号字段。

那么我们查询出来一个版本号之后,update 或者 delete 的时候需要依赖我们查询出来的版本号,判断当前数据库和查询出来的版本号是否相等,如果相等那么就可以执行,如果不等那么就不能执行。

这样的一个策略很像我们的 CAS(Compare And Swap),比较并交换是一个原子操作。这样我们就能避免加 select * for update 行锁的开销。

ZooKeeper

ZooKeeper 也是我们常见的实现分布式锁方法,相比于数据库如果没了解过 ZooKeeper 可能上手比较难一些。

ZooKeeper 是以 Paxos 算法为基础的分布式应用程序协调服务。ZK 的数据节点和文件目录类似,所以我们可以用此特性实现分布式锁。

我们以某个资源为目录,然后这个目录下面的节点就是我们需要获取锁的客户端,未获取到锁的客户端注册需要注册 Watcher 到上一个客户端,可以用下图表示:

搞懂“分布式锁”,看这篇文章就对了

/lock 是我们用于加锁的目录,/resource_name 是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。

Curator

Curator 封装了 ZooKeeper 底层的 API,使我们更加容易方便的对 ZooKeeper 进行操作,并且它封装了分布式锁的功能,这样我们就不需要在自己实现了。

Curator 实现了可重入锁(InterProcessMutex),也实现了不可重入锁(InterProcessSemaphoreMutex)。在可重入锁中还实现了读写锁。

InterProcessMutex

InterProcessMutex 是 Curator 实现的可重入锁,我们可以通过下面的一段代码实现我们的可重入锁:

搞懂“分布式锁”,看这篇文章就对了

我们利用 acuire 进行加锁,release 进行解锁。

加锁的流程具体如下:

  • 首先进行可重入的判定:这里的可重入锁记录在 ConcurrentMap

如果 threadData.get(currentThread)是有值的那么就证明是可重入锁,然后记录就会加 1。

我们之前的 MySQL 其实也可以通过这种方法去优化,可以不需要 count 字段的值,将这个维护在本地可以提高性能。

  • 然后在我们的资源目录下创建一个节点:比如这里创建一个 /0000000002 这个节点,这个节点需要设置为 EPHEMERAL_SEQUENTIAL 也就是临时节点并且有序。
  • 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个。
  • 如果是第一个,则获取到锁,那么可以返回。
  • 如果不是第一个,则证明前面已经有人获取到锁了,那么需要获取自己节点的前一个节点。

/0000000002 的前一个节点是 /0000000001,我们获取到这个节点之后,再上面注册 Watcher(这里的 Watcher 其实调用的是 object.notifyAll(),用来解除阻塞)。

  • object.wait(timeout) 或 object.wait():进行阻塞等待,这里和我们第 5 步的 Watcher 相对应。

解锁的具体流程:

  • 首先进行可重入锁的判定:如果有可重入锁只需要次数减 1 即可,减 1 之后加锁次数为 0 的话继续下面步骤,不为 0 直接返回。
  • 删除当前节点。
  • 删除 threadDataMap 里面的可重入锁的数据。

读写锁

Curator 提供了读写锁,其实现类是 InterProcessReadWriteLock,这里的每个节点都会加上前缀:

private static final String READ_LOCK_NAME = "__READ__"; 
private static final String WRITE_LOCK_NAME = "__WRIT__";

根据不同的前缀区分是读锁还是写锁,对于读锁,如果发现前面有写锁,那么需要将 Watcher 注册到和自己最近的写锁。写锁的逻辑和我们之前 4.2 分析的依然保持不变。

锁超时

ZooKeeper 不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个 ZK 的 Session,通过这个 Session,ZK 可以判断机器是否宕机。

如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。

ZK 小结:

  • 优点:ZK 可以不需要关心锁超时时间,实现起来有现成的第三方包,比较方便,并且支持读写锁,ZK 获取锁会按照加锁的顺序,所以其是公平锁。对于高可用利用 ZK 集群进行保证。
  • 缺点:ZK 需要额外维护,增加维护成本,性能和 MySQL 相差不大,依然比较差。并且需要开发人员了解 ZK 是什么。

Redis

大家在网上搜索分布式锁,恐怕最多的实现就是 Redis 了,Redis 因为其性能好,实现起来简单所以让很多人都对其十分青睐。

Redis 分布式锁简单实现

熟悉 Redis 的同学那么肯定对 setNx(set if not exist) 方法不陌生,如果不存在则更新,其可以很好的用来实现我们的分布式锁。

对于某个资源加锁我们只需要:

setNx resourceName value 

这里有个问题,加锁了之后如果机器宕机那么这个锁就不会得到释放所以会加入过期时间,加入过期时间需要和 setNx 同一个原子操作。

在 Redis 2.8 之前我们需要使用 Lua 脚本达到我们的目的,但是 Redis 2.8 之后 Redis 支持 nx 和 ex 操作是同一原子操作。

set resourceName value ex 5 nx 

Redission

Javaer 都知道 Jedis,Jedis 是 Redis 的 Java 实现的客户端,其 API 提供了比较全面的 Redis 命令的支持。

Redission 也是 Redis 的客户端,相比于 Jedis 功能简单。Jedis 简单使用阻塞的 I/O 和 Redis 交互,Redission 通过 Netty 支持非阻塞 I/O。

Jedis 最新版本 2.9.0 是 2016 年的快 3 年了没有更新,而 Redission 最新版本是 2018 年 10 月更新。

Redission 封装了锁的实现,其继承了 java.util.concurrent.locks.Lock 的接口,让我们像操作我们的本地 Lock 一样去操作 Redission 的 Lock。

下面介绍一下其如何实现分布式锁:

搞懂“分布式锁”,看这篇文章就对了

Redission 不仅提供了 Java 自带的一些方法(lock,tryLock),还提供了异步加锁,对于异步编程更加方便。

由于内部源码较多,就不贴源码了,这里用文字叙述来分析它是如何加锁的,这里分析一下 tryLock 方法:

①尝试加锁:首先会尝试进行加锁,由于需要兼容老版本的 Redis,所以不能直接使用 ex,nx 原子操作的 API,那么就只能使用 Lua 脚本,相关的 Lua 脚本如下:

搞懂“分布式锁”,看这篇文章就对了

可以看见它并没有使用我们的 sexNx 来进行操作,而是使用的 hash 结构,我们的每一个需要锁定的资源都可以看做是一个 HashMap,锁定资源的节点信息是 Key,锁定次数是 Value。

通过这种方式可以很好的实现可重入的效果,只需要对 Value 进行加 1 操作,就能进行可重入锁。当然这里也可以用之前我们说的本地计数进行优化。

②如果尝试加锁失败,判断是否超时,如果超时则返回 false。

③如果加锁失败之后,没有超时,那么需要在名字为 redisson_lock__channel+lockName 的 channel 上进行订阅,用于订阅解锁消息,然后一直阻塞直到超时,或者有解锁消息。

④重试步骤 1,2,3,直到最后获取到锁,或者某一步获取锁超时。

对于我们的 unlock 方法比较简单也是通过 lua 脚本进行解锁,如果是可重入锁,只是减 1。如果是非加锁线程解锁,那么解锁失败。

搞懂“分布式锁”,看这篇文章就对了

Redission 还有公平锁的实现,对于公平锁其利用了 list 结构和 hashset 结构分别用来保存我们排队的节点,和我们节点的过期时间,用这两个数据结构帮助我们实现公平锁,这里就不展开介绍了,有兴趣可以参考源码。

RedLock

我们想象一个这样的场景当机器 A 申请到一把锁之后,如果 Redis 主宕机了,这个时候从机并没有同步到这一把锁,那么机器 B 再次申请的时候就会再次申请到这把锁。

为了解决这个问题 Redis 作者提出了 RedLock 红锁的算法,在 Redission 中也对 RedLock 进行了实现。

搞懂“分布式锁”,看这篇文章就对了

通过上面的代码,我们需要实现多个 Redis 集群,然后进行红锁的加锁,解锁。

具体的步骤如下:

①首先生成多个 Redis 集群的 Rlock,并将其构造成 RedLock。

②依次循环对三个集群进行加锁,加锁的过程和 5.2 里面一致。

③如果循环加锁的过程中加锁失败,那么需要判断加锁失败的次数是否超出了最大值,这里的最大值是根据集群的个数,比如三个那么只允许失败一个,五个的话只允许失败两个,要保证多数成功。

④加锁的过程中需要判断是否加锁超时,有可能我们设置加锁只能用 3ms,第一个集群加锁已经消耗了 3ms 了。那么也算加锁失败。

⑤3,4 步里面加锁失败的话,那么就会进行解锁操作,解锁会对所有的集群在请求一次解锁。

可以看见 RedLock 基本原理是利用多个 Redis 集群,用多数的集群加锁成功,减少 Redis 某个集群出故障,造成分布式锁出现问题的概率。

Redis 小结:

  • 优点:对于 Redis 实现简单,性能对比 ZK 和 MySQL 较好。如果不需要特别复杂的要求,自己就可以利用 setNx 进行实现,如果自己需要复杂的需求的话,可以利用或者借鉴 Redission。对于一些要求比较严格的场景可以使用 RedLock。
  • 缺点:需要维护 Redis 集群,如果要实现 RedLock 需要维护更多的集群。

分布式锁的安全问题

上面我们介绍过红锁,但是 Martin Kleppmann 认为其依然不安全。

有关于 Martin 反驳的几点,我认为其实不仅仅局限于 RedLock,前面说的算法基本都有这个问题,下面我们来讨论一下这些问题。

长时间的 GC pause

熟悉 Java 的同学肯定对 GC 不陌生,在 GC 的时候会发生 STW(stop-the-world)。

例如 CMS 垃圾回收器,它会有两个阶段进行 STW 防止引用继续进行变化。那么有可能会出现下面图(引用至 Martin 反驳 Redlock 的文章)中这个情况:

搞懂“分布式锁”,看这篇文章就对了

client1 获取了锁并且设置了锁的超时时间,但是 client1 之后出现了 STW,这个 STW 时间比较长,导致分布式锁进行了释放。

client2 获取到了锁,这个时候 client1 恢复了锁,那么就会出现 client1,2 同时获取到锁,这个时候分布式锁不安全问题就出现了。

这个不仅仅局限于 RedLock,对于我们的 ZK,MySQL 一样的有同样的问题。

时钟发生跳跃

对于 Redis 服务器如果其时间发生了跳跃,肯定会影响我们锁的过期时间。

那么我们的锁过期时间就不是我们预期的了,也会出现 client1 和 client2 获取到同一把锁,也会出现不安全,这个对于 MySQL 也会出现。但是 ZK 由于没有设置过期时间,那么发生跳跃也不会受影响。

长时间的网络 I/O

这个问题和我们的 GC 的 STW 很像,也就是我们这个获取了锁之后我们进行网络调用,其调用时间由可能比我们锁的过期时间都还长,那么也会出现不安全的问题,这个 MySQL 也会有,ZK 也不会出现这个问题。

对于这三个问题,在网上包括 Redis 作者在内发起了很多讨论。

GC 的 STW

对于这个问题可以看见基本所有的都会出现问题,Martin 给出了一个解法,对于 ZK 这种他会生成一个自增的序列,那么我们真正进行对资源操作的时候,需要判断当前序列是否是最新,有点类似于乐观锁。

当然这个解法 Redis 作者进行了反驳,你既然都能生成一个自增的序列了那么你完全不需要加锁了,也就是可以按照类似于 MySQL 乐观锁的解法去做。

我自己认为这种解法增加了复杂性,当我们对资源操作的时候需要增加判断序列号是否是最新,无论用什么判断方法都会增加复杂度,后面会介绍谷歌的 Chubby 提出了一个更好的方案。

时钟发生跳跃

Martin 觉得 RedLock 不安全很大的原因也是因为时钟的跳跃,因为锁过期强依赖于时间,但是 ZK 不需要依赖时间,依赖每个节点的 Session。

Redis 作者也给出了解答,对于时间跳跃分为人为调整和 NTP 自动调整:

  • 人为调整:人为调整影响的完全可以人为不调整,这个是处于可控的。
  • NTP 自动调整:这个可以通过一定的优化,把跳跃时间控制在可控范围内,虽然会跳跃,但是是完全可以接受的。

长时间的网络 I/O

这一块不是他们讨论的重点,我自己觉得,对于这个问题的优化可以控制网络调用的超时时间,把所有网络调用的超时时间相加。

那么我们锁过期时间其实应该大于这个时间,当然也可以通过优化网络调用比如串行改成并行,异步化等。

Chubby 的一些优化

大家搜索 ZK 的时候,会发现他们都写了 ZK 是 Chubby 的开源实现,Chubby 内部工作原理和 ZK 类似。但是 Chubby 的定位是分布式锁和 ZK 有点不同。

Chubby 也是使用上面自增序列的方案用来解决分布式不安全的问题,但是它提供了多种校验方法:

  • CheckSequencer():调用 Chubby 的 API 检查此时这个序列号是否有效。
  • 访问资源服务器检查,判断当前资源服务器最新的序列号和我们的序列号的大小。
  • lock-delay:为了防止我们校验的逻辑入侵我们的资源服务器,其提供了一种方法当客户端失联的时候,并不会立即释放锁,而是在一定的时间内(默认 1min)阻止其他客户端拿去这个锁。

那么也就是给予了一定的 buffer 等待 STW 恢复,而我们的 GC 的 STW 时间如果比 1min 还长那么你应该检查你的程序,而不是怀疑你的分布式锁了。

小结

本文主要讲了多种分布式锁的实现方法,以及它们的一些优缺点。最后也说了一下关于分布式锁的安全的问题。

对于不同的业务需要的安全程度完全不同,我们需要根据自己的业务场景,通过不同的维度分析,选取最适合自己的方案。

一文弄懂“分布式锁”

多线程情况下对共享资源的操作需要加锁,避免数据被写乱,在分布式系统中,这个问题也是存在的,此时就需要一个分布式锁服务。常见的分布式锁实现一般是基于DB、Redis、zookeeper。下面笔者会按照顺序分析下这3种分布式锁的设计与实现,想直接看分布式锁总结的小伙伴可直接翻到文档末尾处。

分布式锁的实现由多种方式,但是不管怎样,分布式锁一般要有以下特点:

  • 排他性:任意时刻,只能有一个client能获取到锁
  • 容错性:分布式锁服务一般要满足AP,也就是说,只要分布式锁服务集群节点大部分存活,client就可以进行加锁解锁操作
  • 避免死锁:分布式锁一定能得到释放,即使client在释放之前崩溃或者网络不可达

除了以上特点之外,分布式锁最好也能满足可重入、高性能、阻塞锁特性(AQS这种,能够及时从阻塞状态唤醒)等,下面就话不多说,赶紧上(开往分布式锁的设计与实现的)车~

DB锁

在数据库新建一张表用于控制并发控制,表结构可以如下所示:

CREATE TABLE `lock_table` (
 `id` int(11) unsigned NOT NULL COMMENT '主键',
 `key_id` bigint(20) NOT NULL COMMENT '分布式key',
 `memo` varchar(43) NOT NULL DEFAULT '' COMMENT '可记录操作内容',
 `update_time` datetime NOT NULL COMMENT '更新时间',
 PRIMARY KEY (`id`,`key_id`),
 UNIQUE KEY `key_id` (`key_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

key_id作为分布式key用来并发控制,memo可用来记录一些操作内容(比如memo可用来支持重入特性,标记下当前加锁的client和加锁次数)。将key_id设置为唯一索引,保证了针对同一个key_id只有一个加锁(数据插入)能成功。此时lock和unlock伪代码如下:

def lock :
 exec sql: insert into lock_table(key_id, memo, update_time) values (key_id, memo, NOW())
 if result == true :
 return true
 else :
 return false
def unlock :
 exec sql: delete from lock_table where key_id = 'key_id' and memo = 'memo'

注意,伪代码中的lock操作是非阻塞锁,也就是tryLock,如果想实现阻塞(或者阻塞超时)加锁,只修反复执行lock伪代码直到加锁成功为止即可。基于DB的分布式锁其实有一个问题,那就是如果加锁成功后,client端宕机或者由于网络原因导致没有解锁,那么其他client就无法对该key_id进行加锁并且无法释放了。为了能够让锁失效,需要在应用层加上定时任务,去删除过期还未解锁的记录,比如删除2分钟前未解锁的伪代码如下:

def clear_timeout_lock :
 exec sql : delete from lock_table where update_time < ADDTIME(NOW(),'-00:02:00')

因为单实例DB的TPS一般为几百,所以基于DB的分布式性能上限一般也是1k以下,一般在并发量不大的场景下该分布式锁是满足需求的,不会出现性能问题。不过DB作为分布式锁服务需要考虑单点问题,对于分布式系统来说是不允许出现单点的,一般通过数据库的同步复制,以及使用vip切换Master就能解决这个问题。

以上DB分布式锁是通过insert来实现的,如果加锁的数据已经在数据库中存在,那么用select xxx where key_id = xxx for udpate方式来做也是可以的。

Redis锁

Redis锁是通过以下命令对资源进行加锁:

set key_id key_value NX PX expireTime

其中,set nx命令只会在key不存在时给key进行赋值,px用来设置key过期时间,key_value一般是随机值,用来保证释放锁的安全性(释放时会判断是否是之前设置过的随机值,只有是才释放锁)。由于资源设置了过期时间,一定时间后锁会自动释放。

set nx保证并发加锁时只有一个client能设置成功(Redis内部是单线程,并且数据存在内存中,也就是说redis内部执行命令是不会有多线程同步问题的),此时的lock/unlock伪代码如下:

def lock:
 if (redis.call('set', KEYS[1], ARGV[1], 'ex', ARGV[2], 'nx')) then
 return true
 end
 return false
 
def unlock:
 if (redis.call('get', KEYS[1]) == ARGV[1]) then
 redis.call('del', KEYS[1])
 return true
 end
 return false

分布式锁服务中的一个问题

如果一个获取到锁的client因为某种原因导致没能及时释放锁,并且redis因为超时释放了锁,另外一个client获取到了锁,此时情况如下图所示:

一文弄懂“分布式锁”

那么如何解决这个问题呢,一种方案是引入锁续约机制,也就是获取锁之后,释放锁之前,会定时进行锁续约,比如以锁超时时间的1/3为间隔周期进行锁续约。

关于开源的redis的分布式锁实现有很多,比较出名的有redisson、百度的dlock,关于分布式锁,笔者也写了一个简易版的分布式锁redis-lock,主要是增加了锁续约和可同时针对多个key加锁的机制。

对于高可用性,一般可以通过集群或者master-slave来解决,redis锁优势是性能出色,劣势就是由于数据在内存中,一旦缓存服务宕机,锁数据就丢失了。像redis自带复制功能,可以对数据可靠性有一定的保证,但是由于复制也是异步完成的,因此依然可能出现master节点写入锁数据而未同步到slave节点的时候宕机,锁数据丢失问题。

zookeeper分布式锁

ZooKeeper是一个高可用的分布式协调服务,由雅虎创建,是Google Chubby的开源实现。ZooKeeper提供了一项基本的服务:分布式锁服务。zookeeper重要的3个特征是:zab协议、node存储模型和watcher机制。通过zab协议保证数据一致性,zookeeper集群部署保证可用性,node存储在内存中,提高了数据操作性能,使用watcher机制,实现了通知机制(比如加锁成功的client释放锁时可以通知到其他client)。

zookeeper node模型支持临时节点特性,即client写入的数据时临时数据,当客户端宕机时临时数据会被删除,这样就不需要给锁增加超时释放机制了。当针对同一个path并发多个创建请求时,只有一个client能创建成功,这个特性用来实现分布式锁。注意:如果client端没有宕机,由于网络原因导致zookeeper服务与client心跳失败,那么zookeeper也会把临时数据给删除掉的,这时如果client还在操作共享数据,是有一定风险的。

基于zookeeper实现分布式锁,相对于基于redis和DB的实现来说,使用上更容易,效率与稳定性较好。curator封装了对zookeeper的api操作,同时也封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等,使用curator进行分布式加锁示例如下:

<!--引入依赖-->
<!--对zookeeper的底层api的一些封装-->
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>2.12.0</version>
</dependency>
 
<!--封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等-->
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>2.12.0</version>
</dependency>
public static void main(String[] args) throws Exception {
 String lockPath = "/curator_recipes_lock_path";
 CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.193.128:2181")
 .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
 client.start();
 InterProcessMutex lock = new InterProcessMutex(client, lockPath);
 Runnable task = () -> {
 try {
 lock.acquire();
 try {
 System.out.println("zookeeper acquire success: " + Thread.currentThread().getName());
 Thread.sleep(1000);
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 lock.release();
 }
 } catch (Exception ex) {
 ex.printStackTrace();
 }
 };
 ExecutorService executor = Executors.newFixedThreadPool(10);
 for (int i = 0; i < 1000; i++) {
 executor.execute(task);
 }
 LockSupport.park();
}

总结

从上面介绍的3种分布式锁的设计与实现中,我们可以看出每种实现都有各自的特点,针对潜在的问题有不同的解决方案,归纳如下:

  • 性能:redis > zookeeper > db。
  • 避免死锁:DB通过应用层设置定时任务来删除过期还未释放的锁,redis通过设置超时时间来解决,而zookeeper是通过临时节点来解决。
  • 可用性:DB可通过数据库同步复制,vip切换master来解决,redis可通过集群或者master-slave方式来解决,zookeeper本身自己是通过zab协议集群部署来解决的。注意,DB和redis的复制一般都是异步的,也就是说某些时刻分布式锁发生故障可能存在数据不一致问题,而zookeeper本身通过zab协议保证集群内(至少n/2+1个)节点数据一致性。
  • 锁唤醒:DB和redis分布式锁一般不支持唤醒机制(也可以通过应用层自己做轮询检测锁是否空闲,空闲就唤醒内部加锁线程),zookeeper可通过本身的watcher/notify机制来做。

使用分布式锁,安全性上和多线程(同一个进程内)加锁是没法比的,可能由于网络原因,分布式锁服务(因为超时或者认为client挂了)将加锁资源给删除了,如果client端继续操作共享资源,此时是有隐患的。因此,对于分布式锁,一个是尽量提高分布式锁服务的可用性,另一个就是要部署同一内网,尽量降低网络问题发生几率。这样来看,貌似分布式锁服务不是“完美”的(PS:技术貌似也不好做到十全十美 🙁 ),那么开发人员该如何选择分布式锁呢?最好是结合自己的业务实际场景,来选择不同的分布式锁实现,一般来说,基于redis的分布式锁服务应用较多。

restful最佳实践——接口规范

为了前后端分工明确,对接流畅,确保可读性和扩展性以及高可用、一致性,特约定下述无状态RESTful API规范:

写在前面

前后端分离意味着,前后端之间使⽤ JSON 来交流,两个开发团队之间使⽤ API 作为契约进⾏交互。从此,后台选⽤的技术栈不影响前台。当我们决定需要前后端分离时,我们仍然还需要⾯对⼀系列的问题:

  1. 是否⾜够的安全?我们怎么去存储⽤户数据,使⽤ LocalStorage 的话,还要考虑加密。采⽤哪种认证⽅式来让⽤户登录,并保存相应的状态?
  2. 是否有⾜够的技术来⽀撑前后端分离?有没有能⼒创建出符合 RESTful 风格的API?
  3. 是否有能⼒维护 API 接口?当前端或者后台需要修改接⼜时,是否能轻松地修改?前端和后台两个团队是不是很容易合作?是不是可以轻松地进⾏联调?
  4. 前后端职责是否能明确?即:后台提供数据,前端负责显⽰。
  5. 是否建⽴了前端的错误追踪机制?能否帮助我们快速地定位出问题。

前后端分离的核⼼:后台提供数据,前端负责显⽰

前提

RESTful API 统一约束客户端和服务器之间的接口。简化和分离系统架构,使每个模块独立!

请求中使用URI定位资源

用HTTP Verbs[动词](GET、POST、PUT、DELETE)描述操作(具体表现形式)

数据传递(默认)采用:Content-Type: application/json; charset=utf-8

Rest

REST即表述性状态传递(英文:Representational State Transfer,简称REST)是Roy Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。它是一种针对网络应用的设计和开发方式,可以降低开发的复杂性,提高系统的可伸缩性。**REST是设计风格而不是标准。**REST通常基于使用HTTP,URI,和XML(标准通用标记语言下的一个子集)以及HTML(标准通用标记语言下的一个应用)

统一接口(Uniform Interface)

统一接口约束定义客户端和服务器之间的接口。它简化了分离的结构,使各部分独立发展。

无状态(Stateless)

REST要求状态要么被放入资源状态中,要么保存在客户端上。或者换句话说,服务器端不能保持除了单次请求之外的,任何与其通信的客户端的通信状态。从客户端的每个请求要包含服务器所需要的所有信息。这样做的最直接的理由就是可伸缩性—— 如果服务器需要保持客户端状态,那么大量的客户端交互会严重影响服务器的内存可用空间(footprint)。

缓存(Cachable)

服务器返回信息必须被标记是否可以缓存,如果缓存,客户端可能会重用之前的信息发送请求。

客户-服务器(Client-Server)

客户端无需关注数据存储,服务器端无需关注用户界面,提高了前后端可移植性。

分层系统(Layered System)

客户端不关心直接连接到最终服务器还是连接到中间服务器。中间服务器可以通过启用负载平衡和提供共享缓存来提高系统可扩展性。分层系统也可以执行安全策略。

支持按需代码(Code on Demand,可选)

服务器可以通过传输逻辑来临时扩展或定制客户端的功能。

URL规范

GET https//domain.com/api/{模块名}/{?菜单名}/{接口名}/:param

  1. 不能使用大写,用中横线 – 不用下划线 _ ;
  2. 使用名词表示资源集合,使用复数形式(为确保所有API URIs保持一致),不能使用动词;
  3. 每个资源都至少有一个标识它的URI,同时应该遵循一个可预测的层次结构来提高可理解性,从而提高可用性;
  4. 无需在URI中增加版本号,通过HTTP请求头信息的字段中进行区分(或者在URI包含主版本信息,同时请求头包含子版本信息。

Accept: vnd.example-com.foo+json;version=1.1Accept: vnd.example-com.foo+json; version=2.0

restful最佳实践——接口规范

Request

restful最佳实践——接口规范

说明:

  1. 安全性 :不会改变资源状态,可以理解为只读的;
  2. 幂等性 :执行1次和执行N次,对资源状态改变的效果是等价的。
  3. 查询字段内容过多,统一使用POST方式查询,请求地址增加/query加以区分
  4. 批量删除,统一使用POST方式,请求地址增加/delete加以区分

由于存在批量删除的情况,而一些网关、代理、防火墙在收到DELETE请求后,会把请求的body直接剥离掉。建议将存在批量删除的接口统一改成POST提交,为了标识是删除操作,在请求路径上增加/delete。

GET

被用于获取资源。不允许对服务器上资源做任何修改操作。

示例:

http://www.example.com/customers/12345
http://www.example.com/customers/12345/orders
http://www.example.com/buckets/sample

PUT

常用于更新资源。通过请求体携带资源发送给服务器。注意:在资源ID由客户端而不是由服务器选择的情况下,也可以使用PUT来创建资源。修改成功返回200,创建成功返回201。建议使用post进行创建新资源。

http://www.example.com/customers/12345
http://www.example.com/customers/12345/orders/98765
http://www.example.com/buckets/secret_stuff

POST

常用于创建新资源。创建成功通常返回201。

http://www.example.com/customers
http://www.example.com/customers/12345/orders

DELETE

删除资源。

http://www.example.com/customers/12345
http://www.example.com/customers/12345/orders
http://www.example.com/bucket

restful最佳实践——接口规范

其他

  • 排序

使用数组传递排序字段,-表示降序,无任何标识表示升序。

sorts: [‘-age’, ‘name’]

  • 时间传递

日期和时间戳如果没有适当和一致地处理,可能是一个真正的头痛。建议使用UTC或GMT时间存储,处理,缓存等时间戳或者使用统一格式化的时间字符串”yyyy-MM-dd HH:mm:ss”

Respone

状态码

restful最佳实践——接口规范

格式

  • 前后端交互字段全部使用小驼峰方式

// HTTP响应码(好多javascript框架并不会获取http状态码,所以包装到body中便于使用)
{ “code”: “200”, “status”: “success/fail/error”, // 见下述表格 “content/data”: []/{}, // 多条记录使用JSON数组,单条记录使用JSON对象 “message”: [] // 状态为error或fail时,对应的错误信息}

status说明

restful最佳实践——接口规范

示例

图表、下拉列表框

  • 图表、下拉列表框等建议统一key-name-value形式返回,这样对于图表来说可以统一处理,无需考虑业务性,增加了其复用性!

GET http://xxx.com/api/dashboard/se-count/:uid

{ “status”: “success”, “content”: [ { “key”: “low”, // 前后端交互使用关键字 “name”: “低级”, // 前端显示 “value”: 540 }, { “key”: “medium”, “name”: “中级”, “value”: 336 }, { “key”: “high”, “name”: “高级”, “value”: 92 } ], errorCode: “”, message: “”}

多条曲线

“status”: “success”, “content”:[ { “key”: “firewall”, “name”: “Firewall”, “value”: [ { “key”: 1508083200000, “name”: “3:00”, “value”: 23 }, { “key”: 1508094000000, “name”: “6:00”, “value”: 43 } ] }, { “key”: “vpn”, “name”: “VPN”, “value”: [ { “key”: 1508083200000, “name”: “3:00”, “value”: 31 }, { “key”: 1508094000000, “name”: “6:00”, “value”: 33 } ] } ]}

表格分页

请求

POST http://xxx.com/api/dashboard/se-by-source-ip

{ startTime: 1505977365777, endTime: 1506063765777, pageNum: 1, // 当前页码 pageSize: 20, // 每页条数 }

响应结果

{ “total”: 1, // 总条数 “totalPage”: 1, // (可选)总页数 “pageNum”: 1, // (可选)当前页码 “pageSize”: 20, // (可选)每页条数 “value”: [ { “field1”: “value”, “field2”: “value”, “field3”: “value”, “field4”: “value” } ]}

需要处理的问题

  • Get请求参数过多,或携带敏感信息
  • 批量删除,携带一组id信息
  • 文件导出、文件上传