网易乐得技术团队

贵金属推送实现及优化

贵金属群推第一版的实现方式

1
贵金属推送的设备存放在redis中,用的redis的sortedset数据结构,每次有符合条件的用户进来,就会往这个set添加设备信息或者更新设备信息在这个set中的位置,使得最近一段时间的活跃用户始终在队列最前面,方便过滤掉不活跃的用户设备。
如图1所示,第一版采用的是复制群推列表的方式,每次推送前将整个set列表复制一份,这里用的是redis的ZUNIONSTORE命令,每次需要推送时分页从redis set中取出4w设备,然后从set中删掉这批设备,最后新建一个线程并且将这个线程扔进线程池,每个线程主要操作是同步调用推送平台接口。这里复制出一个set列表目的就是为了避免在推送过程中有用户设备信息的变动,导致推送的时候set列表发生变化,最终可能会给某些用户重复推送。
这种实现方式最大的优点就是逻辑十分简单,每次需要调用群推接口时只需要将当前的设备列表复制一份,然后推送从复制的列表取设备,用户注册或者删除信息都在原始set列表上操作,不会修改复制出去的set列表。这样的方式有个严重的缺点,就是复制列表的耗时(复制一次在2秒左右(50w设备)),占用redis资源(使redis在2秒内不能处理其他请求),影响项目其他使用redis的功能。

贵金属群推第二版

1
针对最初的群推方案的缺点我们进行了一次优化如图3所示,每次推送前不必复制一个备份列表,直接在原列表上取数redis据,这样可以避免复制大量的设备数据。但是出现另外一个问题,在推送时列表可能会变,因为可能有用户删除或者新增加到这个列表,所以在推送时需要保证这个列表不变,并且用户的增加或者删除还得在推送后生效。
如图4所示,用户在注册或者删除列表时,会先读取key1中的值,如果大于0则说明有群推正在进行,如果小于0则说明没有群推在进行(这个需要进一步做确认)。首先如果有群推在进行则将用户操作缓存在redis中,如果没有群推在进行,则watch key2,然后启动redis事务对设备列表进行设备注册或者删除,如果redis返回结果不为空,则说明注册或者删除设备时的确没有群推在进行操作结果生效,如果返回结果为空说明有群推在进行操作结果失效(这部分是由redis乐观锁机制保证的),将操作缓存到redis。这种实现方式保证了在有推送的时候,设备列表是不变的,避免给用户重复推或者漏推的情形。
为了将用户操作生效,我们起了一个定时任务,定期处理缓存列表中的用户操作从而不会让用户的操作失效。
除了在获取设备列表的时候做了优化,另外我们将调用推送平台推送接口的时候也做了一下处理。由于我们在调用推送接口的时候不需要关心接口返回值是否成功,只需要正确调用接口就行。所以利用异步http链接池可能会提高推送速度。因为异步http链接池不需要等待返回结果就可以马上处理后续逻辑,有利于提高处理速度。采用链接池不需要在每次调用推送接口时重新建立http链接,也可以减少推送时间,并且可以减少新建http链接带来的资源开销。
这个优化大大减少了推送耗时,原来推送50万左右的设备主线程需要10秒左右,优化后只需要4秒左右,并且还解决了一个redis的隐患。

贵金属群推第三版

经过第二次优化后在推送流程上改进空间不大了,在设备少的时候,现在的推送效率差不多够用了。但是当设备达到持续增加时,目前这种单线程的处理方式的性能可能会急剧下降,这对于贵金属业务的行情推送来说是不太乐观的,所以希望继续优化一下这一块。想要继续提高推送效率,只能朝多线程方向或者多机同时推送。由于多机相对比较复杂并且我们用户量还比较少,目前先弄了一个多线程的版本。
多线程改进的主要思路:群推的目的就是在短时间内向大量的用户推送一批信息。当用户数量比较大的时候,需要从redis中取设备的数据变得很多,由于我们是分批取,所以取的次数就会变得很多。调用推送平台接口也是分批调用,目前是1000个设备调用一次。目前的群推是单线程,每次推送之前,先锁住redis中的set列表,然后每次从redis取出前n个设备后,新建一个线程,扔进一个线程池。线程里面每1000设备调用推送平台接口。然后重复,直到把redis取完所有设备,所有设备取完后再释放锁。考虑到我们推送的时候设备列表在redis中是不变的(列表被锁住了),在推送的时候,首先获取set表中数据量t,把推送set集合分成m段(比如每段2w,m近似为t/m),然后把每段在列表的起始位置分配给不同的线程,让每个线程处理自己的那段用户数据互不干扰。这样同一时间内就可能有机器cpu个数的线程去推送,可以提高推送的效率。由于推送涉及到锁,需要推送结束后释放资源,让用户的操作能顺利进行,所以得等到所有推送任务都结束后释放资源,这里用到了java中的CountDownLatch。

推送优化结果分析

由于推送在改版的时候,贵金属的群推规则也在不断改进,推送朝着更加精准的方向发展,导致在进行三版优化的时候推送用户量没有以前多,只有30w左右。所以性能对比就只能分开对比,第一版和第二版对比,第二版和第三版对比。对比结果如下:

推送版本 设备数量/w 取设备耗时/ms 取设备额外耗时/ms
第一版 4 625 2000
第二版 4 293 0

表-1 redis取设备信息耗时(在线环境)
由表1可以看出在第二版优化后每次取4w设备的时间将近减少了一半,并且没有copy列表所花费的额外的2秒。

推送版本 设备数/w 总耗时 优化方式
第一版 50 10s
第二版 50 4s 去掉复制,利用redis乐观锁及缓冲队列,异步http链接池

表-2(在线环境)
表格2可以看出,同样条件下,相比第一版推送,50w设备推送一次耗时减少6s。
第二版推送与第一版推送相比优化的是推送redis操作部分时间,将copy redis列表的操作时间从2000ms减少到0,将每取4w设备操作时间从625ms优化到293ms。已经4w设备的推送的优化,由原来的同步http链接换成了异步http链接池,极大的减少了每次调用推送平台接口的时间,从而减少了每4w设备的处理时间。总体上的推送时间比第一版推送提高了1.5倍。
由于在线环境的设备数量不是很多,所以第三版优化采用了一组测试环境的数据,一下是贵金属测试环境,但是tomcat所在机器
性能与在线环境一样:

推送版本 设备数量/w 总耗时/s
第二版 40 9
第三版 40 6
第二版 100 21
第三版 100 13

表-3(测试环境)
由表格三可以看出,在同样的环境推送40万设备的时候第三版推送耗时比第二版的耗时少了3s。推100万的时候第三版比第二版少用了8秒,效率第三版明显比第二版高。

推送版本 设备数量/w 总耗时/s
第二版 42 4
第三版 42 3

表-4
线上环境的推送两者之间差别不大,是由
于线上环境从redis中读取设备时耗时不大,每读取2w设备在100ms左右,并且设备也不是那么多,而本次优化的地方就是redis中设备的读取,所以线上优化后的时间只比优化前的少1秒。依照测试环境和在线环境40w设备推送结果对比,可以大概估计出在线第三版推送,推送100w设备的时间10秒以内。

后续优化方向

如果设备数量急剧增加,达到一定数量量后,单机多线程的推送肯定也会耗时比较长。因为一台机器的cpu数,io数有限。单纯增加线程去推送,会有很多线程在等待cpu,io资源导致推送会耗时比较大。所以后续优化方向可以考略多台机器同时推送。方法类似多线程,首先利用redis乐观锁锁住设备列表,然后给每台机器分配一定数量的设备,等待所有机器都推送完后再释放锁。

参考代码

异步http链接池接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 异步HTTP请求
* @param url 地址
* @param charSet 编码
* @param method 请求方法:0 GET,1 POST
* @param params 参数
* @param connectionTime 链接超时
* @param readTimeOut 读超时
* @param callback 回调函数
*/
public void sendAsynRequest(String url, String charSet, int method, Map<String,String> params,int connectionTime,
int readTimeOut, AsyncHttpStringCallback callback){
HttpContext httpContext = new HttpContext();
try {
if (StringUtils.isEmpty(url)) {
LOGGER.error("[sendAsynPost] error url = {}", url);
} else {
httpContext.setUrl(url);
}
if (StringUtils.isEmpty(charSet)){
charSet = defaultCharSet;
}
if (method >= 0 && method <= 1) {
httpContext.setMethod(method);
} else {
LOGGER.error("[sendAsynPost] error method = {}", method);
}
if(connectionTime > 0 ){
httpContext.setConnectTimeout(connectionTime);
}
if (readTimeOut > 0){
httpContext.setRequestTimeout(readTimeOut);
}
httpContext.setContent(getContent(params,charSet));
if (asyncHttpClientManager != null) {
asyncHttpClientManager.execute(httpContext, callback);
} else {
LOGGER.error("[sendAsynPost] error asyncHttpClientManager is null,{}", ObjectMapperUtils.toJson(asyncHttpClientManager));
}
} catch (Exception e) {
LOGGER.error("[sendAsynRequest] error url = {},params = {}", url, ObjectMapperUtils.toJson(params), e);
}
此方法为异步http链接池接口,asyncHttpClientManager实现采用的电影票封装的apache的异步http链接池接口。这个方法调用后可以立即返回,不用等待结果,待结果返回后会调用callback的方法,如果返回成功会调用completed方法,如果失败会调用failed方法。Callback需要实现AsyncHttpStringCallback抽象类。  

设备注册接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@JedisWay
public boolean setCustomSubWithWatch(CustomKeyModel customKeyModel, CustomValueModel customValueModel) {
boolean flag = false;
String keyStr = customKeyModel.toRedisKey();
String valueStr = customValueModel.toJson();
Jedis jedis = getJedis();
Object object = null;
try {
jedis.watch(PushConstants.PUSH_BATCH_SENTINEL_PRE_KEY_TWO + customKeyModel.getType());
String str = jedis.get(PushConstants.PUSH_BATCH_SENTINEL_PRE_KEY_ONE + customKeyModel.getType());
String signListKey = PushConstants.PUSH_BATCH_SIGN_LIST;
int num = NumberUtils.getIntFromStr(str);
if (num > 0) {
String message = ObjectMapperUtils.toJson(customKeyModel) + ";" + ObjectMapperUtils.toJson(customValueModel);
jedis.sadd(signListKey, message);
LOGGER.info("set custom sub block: [" + keyStr + "," + valueStr + "],num={}", num);
} else {
Transaction tx = jedis.multi();
tx.zadd(keyStr, System.currentTimeMillis(), valueStr);
object = tx.exec();
if (object == null) {
String message = ObjectMapperUtils.toJson(customKeyModel) + ";" + ObjectMapperUtils.toJson(customValueModel);
jedis.sadd(signListKey, message);
LOGGER.info("set custom sub block: [" + keyStr + "," + valueStr + "]");
} else {
LOGGER.info("set custom sub success: [" + keyStr + "," + valueStr + "]");
flag = true;
}
}
} catch (Exception e) {
LOGGER.error("[setCustomSubWithWatch] error customKeyModel = {},customValueModel = {}",
ObjectMapperUtils.toJson(customKeyModel), ObjectMapperUtils.toJson(customValueModel),e);
throw e;
}
return flag;
}

此接口主要是利用了redis的乐观锁和事务来保持和群推操作的一致性,避免了出现有群推正在进行时用户修改列表数据。

删除设备接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@JedisWay
public boolean delCustomSubWithWatch(CustomKeyModel customKeyModel, CustomValueModel customValueModel) {
boolean flag = false;
try {
String keyStr = customKeyModel.toRedisKey();
String valueStr = customValueModel.toJson();
Object object = null;
Jedis jedis = getJedis();
jedis.watch(PushConstants.PUSH_BATCH_SENTINEL_PRE_KEY_TWO + customKeyModel.getType());
String str = jedis.get(PushConstants.PUSH_BATCH_SENTINEL_PRE_KEY_ONE + customKeyModel.getType());
String closeListKey = PushConstants.PUSH_BATCH_CLOSE_LIST;
int num = NumberUtils.getIntFromStr(str);
if (num > 0) {
String message = ObjectMapperUtils.toJson(customKeyModel) + ";" + ObjectMapperUtils.toJson(customValueModel);
jedis.sadd(closeListKey, message);
LOGGER.info("set custom sub block: [" + keyStr + "," + valueStr + "],num = {}", num);
} else {
Transaction tx = jedis.multi();
tx.zrem(keyStr, valueStr);
object = tx.exec();
if (object == null) {
String message = ObjectMapperUtils.toJson(customKeyModel) + ";" + ObjectMapperUtils.toJson(customValueModel);
jedis.sadd(closeListKey, message);
LOGGER.info("set custom sub block: [" + keyStr + "," + valueStr + "]");
} else {
LOGGER.info("del custom sub success: [" + keyStr + "," + valueStr + "]");
flag = true;
}
}
} catch (Exception e) {
LOGGER.error("[delCustomSubWithWatch] error customKeyModel = {},customValueModel = {}",
ObjectMapperUtils.toJson(customKeyModel), ObjectMapperUtils.toJson(customValueModel),e);
throw e;
}
return flag;
}

此方法同注册设备列表方法,只不过处理的是用户删除设备的操作。

处理用户操作的定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void dealSignList() {
String message = customRedis.spop(PushConstants.PUSH_BATCH_SIGN_LIST);
CustomKeyModel customKeyModel = null;
CustomValueModel customValueModel = null;
while (!StringUtils.isEmpty(message)) {
try {
String[] strs = message.split(";");
customKeyModel = ObjectMapperUtils.toObject(strs[0], CustomKeyModel.class);
customValueModel = ObjectMapperUtils.toObject(strs[1], CustomValueModel.class);
if (!isOff(customValueModel.getDeviceId(), customKeyModel.getType())) {
logger.info("[dealSignList] user had shutdown unique = {},type={}", customValueModel.getDeviceId(), customKeyModel.getType());
} else {
if (customKeyModel != null && customValueModel != null) {
if (!customRedis.setCustomSubWithWatch(customKeyModel, customValueModel)) { //防止过于频繁访问REDIS
Thread.sleep(3000);
}
}
}
} catch (Exception e) {
if (!(e instanceof BusinessException)) {
customRedis.sadd(PushConstants.PUSH_BATCH_SIGN_LIST, message);
}
logger.error("[dealSignList] error message = {}", message, e);
}
message = customRedis.spop(PushConstants.PUSH_BATCH_SIGN_LIST);
logger.info("[dealSignList] customKeyModel = {},customValueModel={}", ObjectMapperUtils.toJson(customKeyModel),
ObjectMapperUtils.toJson(customValueModel));
}
}

此方法定时扫描redis key= push_batch_sign_list 的list列表,如果pop出来的值不为空,则说明redis中缓存了用户注册操作。经过判断用户开关状态再再决定是否需要进行用户设备注册操作。如果开关是开的,则调用setCustomSubWithWatch接口进行注册操作。否则直接放弃,去下一个用户操作,重复执行以上操作。

处理用户删除操作定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void dealCloseList() {
String message = customRedis.spop(PushConstants.PUSH_BATCH_CLOSE_LIST);
CustomKeyModel customKeyModel = null;
CustomValueModel customValueModel = null;
while ((!StringUtils.isEmpty(message))) {
try {
String[] strs = message.split(";");
customKeyModel = ObjectMapperUtils.toObject(strs[0], CustomKeyModel.class);
customValueModel = ObjectMapperUtils.toObject(strs[1], CustomValueModel.class);
if (customKeyModel != null && customValueModel != null) {
if (isOff(customValueModel.getDeviceId(), customKeyModel.getType())) {
logger.info("[dealCloseList] user had opened unique = {},type={}", customValueModel.getDeviceId(), customKeyModel.getType());
} else {
if (!customRedis.delCustomSubWithWatch(customKeyModel, customValueModel)) { //防止过于频繁访问REDIS
Thread.sleep(3000);
}
}
}
} catch (Exception e) {
if (!(e instanceof BusinessException)) {
customRedis.sadd(PushConstants.PUSH_BATCH_SIGN_LIST, message);
}
logger.error("[dealCloseList] error message = {}", message, e);
}
logger.info("[dealCloseList] customKeyModel = {},customValueModel={}", ObjectMapperUtils.toJson(customKeyModel),
ObjectMapperUtils.toJson(customValueModel));
message = customRedis.spop(PushConstants.PUSH_BATCH_CLOSE_LIST);
}
}
逻辑同处理用户注册操作的定时任务,只不过处理的是用户删除设备操作。  

群推接口第二版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Override
public long pushBatchMessageNew(MessagePushType type, DeviceType[] deviceTypes, String title, String msg,
String url, String overTimeDays, String jobId, Map<String, Object> extraParams, String partnerId) {
long stage1 = System.currentTimeMillis();
List<CustomValueModel> valueModels;
int push_size = IniBean.getIniIntValue(IniConstant.REDIS_BATCH_READ_SIZE, 10000);
long count = 0;
int countNum = 0;
for (String product : MobAppConstant.PRODUCT_TYPES) {
try {
while (!setBatchPushSentinel(type.getValue()) && countNum < 10) {
countNum++;
LOGGER.info("[pushBatchMessageNew] set sentinel failed countNum = {}", countNum);
}
if (countNum >= 10) {
LOGGER.error("[pushBatchMessageNew] waring push lock error countNum = {}", countNum);
}
int taskCountAnd = 0;
int taskCountIos = 0;
long totalCount = 0;
long totalLogCunt = 0;
for(DeviceType deviceType:deviceTypes){
totalCount = customRedisService.zcount(type,deviceType,partnerId,product);
totalLogCunt += totalCount;
if(deviceType.equals(DeviceType.ANDROID)) {
if(totalCount % 20000 == 0) {
taskCountAnd = (int) totalCount / 20000;
}else{
taskCountAnd = (int) totalCount / 20000 + 1;
}
}else if(deviceType.equals(DeviceType.IPHONE)){
if(totalCount % 20000 == 0) {
taskCountIos = (int) totalCount / 20000;
}else{
taskCountIos = (int) totalCount / 20000 + 1;
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(taskCountAnd + taskCountIos);
for(int i = 0; i < taskCountAnd; i ++){
int startNum = i * 20000;
int endNum = (i + 1) * 20000;
//防止重复推送,redis 命令zrangebyscore起始分数的记录都会获取到
if(i != 0){
startNum += 1;
}
PushThread pushThread = new PushThread(type,partnerId,product,DeviceType.ANDROID,startNum,endNum,
push_size,title,url,msg,overTimeDays,jobId,extraParams,countDownLatch);
pushThread.setName("push-thread-android-" + i);
PushThreadPool.execBatchPush(pushThread);
}
for(int i = 0; i < taskCountIos; i ++){
int startNum = i * 20000;
int endNum = (i + 1) * 20000;
//防止重复推送,redis 命令zrangebyscore起始分数的记录都会获取到
if(i != 0){
startNum += 1;
}
PushThread pushThread = new PushThread(type,partnerId,product,DeviceType.IPHONE,startNum,endNum,
push_size,title,url,msg,overTimeDays,jobId,extraParams,countDownLatch);
pushThread.setName("push-thread-ios-" + i);
PushThreadPool.execBatchPush(pushThread);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
LOGGER.error("[pushBatchMessageNew] wait error",e);
}
long stage7 = System.currentTimeMillis();
LOGGER.info("[pushBatchMessageNew] total cost = {},taskCountAnd = {},taskCountIos = {}," +
"totalLogCount={}", stage7 - stage1,taskCountAnd,taskCountIos,totalLogCunt);
} finally {
int decrCount = 0;
while (!decrBatchPushSentinel(type.getValue()) && count < 1000) {
decrCount++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.info("[decrBatchPushSentinel] error", e);
}
}
if (decrCount >= 1000) {
LOGGER.error("[decrBatchPushSentinel] decr sentinel error please delete it manual");
}
}
}
return count;
}

此接口先对redis中的key1,key2值加一,然后循环从redis中取4w设备,直到取到的设备为空,然后将每个设备列表组装成推送平台需要的参数格式,对后调用线程池处理4w设备。

第三版群推接口:

@Override
    public long pushBatchMessageNew(MessagePushType type, DeviceType[] deviceTypes, String title, String msg,
            String url, String overTimeDays, String jobId, Map<String, Object> extraParams, String partnerId) {

        long stage1 = System.currentTimeMillis();
        List<CustomValueModel> valueModels;
        int push_size = IniBean.getIniIntValue(IniConstant.REDIS_BATCH_READ_SIZE, 10000);
        long count = 0;
        int countNum = 0;
            for (String product : MobAppConstant.PRODUCT_TYPES) {
                try {
                    while (!setBatchPushSentinel(type.getValue()) && countNum < 10) {
                        countNum++;
                        LOGGER.info("[pushBatchMessageNew] set sentinel failed countNum = {}", countNum);
                    }
                    if (countNum >= 10) {
                        LOGGER.error("[pushBatchMessageNew] waring push lock error countNum = {}", countNum);
                    }

                    int taskCountAnd = 0;
                    int taskCountIos = 0;
                    long totalCount = 0;
                    long totalLogCunt = 0;

                    for(DeviceType deviceType:deviceTypes){
                        totalCount = customRedisService.zcount(type,deviceType,partnerId,product);
                        totalLogCunt += totalCount;
                        if(deviceType.equals(DeviceType.ANDROID)) {
                            if(totalCount % 20000 == 0) {
                                taskCountAnd = (int) totalCount / 20000;
                            }else{
                                taskCountAnd = (int) totalCount / 20000 + 1;
                            }
                        }else if(deviceType.equals(DeviceType.IPHONE)){
                            if(totalCount % 20000 == 0) {
                                taskCountIos = (int) totalCount / 20000;
                            }else{
                                taskCountIos = (int) totalCount / 20000 + 1;
                            }
                        }
                    }
                    CountDownLatch countDownLatch = new CountDownLatch(taskCountAnd + taskCountIos);
                    for(int i = 0; i < taskCountAnd; i ++){
                        int startNum = i * 20000;
                        int endNum = (i + 1) * 20000;
                        //防止重复推送,redis 命令zrangebyscore起始分数的记录都会获取到
                        if(i != 0){
                            startNum += 1;
                        }
                        PushThread pushThread = new PushThread(type,partnerId,product,DeviceType.ANDROID,startNum,endNum,
                                push_size,title,url,msg,overTimeDays,jobId,extraParams,countDownLatch);
                        pushThread.setName("push-thread-android-" + i);
                        PushThreadPool.execBatchPush(pushThread);
                    }
                    for(int i = 0; i < taskCountIos; i ++){
                        int startNum = i * 20000;
                        int endNum = (i + 1) * 20000;
                        //防止重复推送,redis 命令zrangebyscore起始分数的记录都会获取到
                        if(i != 0){
                            startNum += 1;
                        }
                        PushThread pushThread = new PushThread(type,partnerId,product,DeviceType.IPHONE,startNum,endNum,
                                push_size,title,url,msg,overTimeDays,jobId,extraParams,countDownLatch);
                        pushThread.setName("push-thread-ios-" + i);
                        PushThreadPool.execBatchPush(pushThread);

                    }
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        LOGGER.error("[pushBatchMessageNew] wait error",e);
                    }
                    long stage7 = System.currentTimeMillis();
                    LOGGER.info("[pushBatchMessageNew] total cost = {},taskCountAnd = {},taskCountIos = {}," +
                            "totalLogCount={}", stage7 - stage1,taskCountAnd,taskCountIos,totalLogCunt);

                } finally {
                    int decrCount = 0;
                    while (!decrBatchPushSentinel(type.getValue()) && count < 1000) {
                        decrCount++;
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            LOGGER.info("[decrBatchPushSentinel] error", e);
                        }
                    }
                    if (decrCount >= 1000) {
                        LOGGER.error("[decrBatchPushSentinel] decr sentinel error please delete it manual");
                    }
                }
            }
        return count;
    }

每段设备的处理线程:

private class PushThread extends Thread{
       private int startNum = 0;
       private int endNum = 0;
       private int pageSize;
       private MessagePushType type;
       private String partnerId;
       private String product;
       private DeviceType deviceType;
       private String title;
       private String msg;
       private String overTimeDays;
       private String url;
       private String jobId;
       private Map<String,Object> extraParams;
       private CountDownLatch countDownLatch;


       public PushThread(MessagePushType type,String partnerId,String product,DeviceType deviceType,int startNum,
                         int endNum,int pageSize,String title,String url,String msg,String overTimeDays,String jobId,
                         Map<String,Object> extraParams,CountDownLatch countDownLatch){
           this.startNum = startNum;
           this.endNum = endNum;
           this.partnerId = partnerId;
           this.product = product;
           this.type = type;
           this.deviceType = deviceType;
           this.pageSize = pageSize;
           this.title = title;
           this.msg = msg;
           this.url = url;
           this.extraParams = extraParams;
           this.jobId = jobId;
           this.overTimeDays = overTimeDays;
           this.countDownLatch = countDownLatch;
       }
       @Override
       public void run() {

           List<CustomValueModel> valueModels = null;
           while (startNum < endNum){
               int tmpEndNum = startNum + pageSize;
               if(tmpEndNum > endNum){
                   tmpEndNum = endNum;
               }
               long stage1 = System.currentTimeMillis();
               valueModels = customRedisService.zrangeCustomSubRange(type,deviceType ,startNum,tmpEndNum ,
                       partnerId, product);
               long stage2 = System.currentTimeMillis();
               PlatformBatchModel model = new PlatformBatchModel(title, msg, overTimeDays, url, jobId,
                       extraParams, product);
               int tmpCount = 0;
               int tmpTotalCount = valueModels.size();
               for (CustomValueModel valueModel:valueModels) {
                   model.addDevice(new PlatformDeviceModel(valueModel.getDeviceId(),
                           valueModel.getTokenId(), String.valueOf(valueModel.getPushType()),
                           model.getJobId(), model.getExtraPara()));
                   if(++tmpCount % 1000 == 0){    //1000个一批
                       PlatformPushUtils.pushBatchMsgWithAsynHttp(model);
                       model = new PlatformBatchModel(title, msg, overTimeDays, url, jobId,
                               extraParams, product);
                   }else if(tmpCount == tmpTotalCount){    //最后一批不足1000时
                       PlatformPushUtils.pushBatchMsgWithAsynHttp(model);
                       model = new PlatformBatchModel(title, msg, overTimeDays, url, jobId,
                               extraParams, product);
                   }

               }
               LOGGER.info("[PushThread] product = {},startNum = {},endNum = {},redis cost = {},size = {}",
                       product,startNum,endNum,stage2 - stage1,valueModels);
               startNum = tmpEndNum + 1;
           }
           countDownLatch.countDown();
       }
   }

同第二版推送,推送开始前先对redis中的key1,key2值加一,然后根据每个redis中没中类型设备数计算出需要多少个线程完成此次推送。计算完后将redis列表划分为多段,然后分给多个线程进行处理。由于需要推送完后释放锁,所以这里用了CountDownLatc,主线程启动后等待,直到所有子线程任务完成后减少计数值,计数值为0就说明所有推送子任务都完成了,最后将key1减1,推送完成。