package com.diagnose.common.service; import com.alibaba.fastjson.JSONObject; import com.diagnose.common.entity.ScheduleLog; import com.hazelcast.collection.ISet; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.crdt.pncounter.PNCounter; import com.hazelcast.instance.impl.HazelcastInstanceProxy; import com.hazelcast.map.IMap; import com.diagnose.common.config.cache.CacheKey; import com.diagnose.common.handler.BizException; import com.diagnose.common.util.logger.LoggerUtil; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.stereotype.Component; import javax.annotation.Nullable; import javax.annotation.Resource; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static com.diagnose.common.config.cache.CacheKey.ONLY_KEY_OBJ; @Component public class CacheService { @Resource private ScheduleLogService scheduleLogService; @Resource private HazelcastInstance hazelcastInstance; private static final int UNLOCK_SLEEP_TIME = 5000; private static final Set loadedSet = new HashSet<>(); /** * 从缓存加载数据,未找到则执行load回调 * * @param mapName cacheMap的名字 * @param key cacheKey * @param load 加载数据回调 * @param 数据类型 * @return */ public T get(String mapName, String key, Callable load) { Map cacheMap = hazelcastInstance.getMap(mapName); return this.get(cacheMap, key, load); } /** * 从缓存加载数据,未找到则执行load回调 * * @param cacheMap cacheMap * @param key cacheKey * @param load 加载数据回调 * @param 数据类型 * @return */ public T get(Map cacheMap, String key, Callable load) { Object obj = cacheMap.get(key); if (obj == null) { T result; try { result = load.call(); } catch (BizException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } if (result == null) { // 防止缓存穿透 cacheMap.put(key, ONLY_KEY_OBJ); return null; } cacheMap.put(key, result); return result; } else if (ONLY_KEY_OBJ.equals(obj)) { return null; } return (T) obj; } public ISet getSet(String key, Callable> load) { ISet iSet = hazelcastInstance.getSet(key); if (loadedSet.contains(key) || !iSet.isEmpty()) { loadedSet.add(key); return iSet; } simpleLock("task-getSet-" + key, 0, TimeUnit.SECONDS, 10, TimeUnit.SECONDS, () -> { try { Set set = load.call(); iSet.addAll(set); loadedSet.add(key); } catch (BizException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } }); return iSet; } public IMap getMap(String key, Callable> load) { IMap iMap = hazelcastInstance.getMap(key); if (loadedSet.contains(key) || !iMap.isEmpty()) { loadedSet.add(key); return iMap; } simpleLock("task-getMap-" + key, 0, TimeUnit.SECONDS, 10, TimeUnit.SECONDS, () -> { try { Map map = load.call(); iMap.clear(); iMap.putAll(map); loadedSet.add(key); } catch (BizException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } }); return iMap; } public Integer getLong(String key, Callable load, int delta) { PNCounter counter = hazelcastInstance.getPNCounter(key); long v = counter.get(); if (loadedSet.contains(key) || v != 0) { loadedSet.add(key); if (delta == 0) { return (int) v; } return (int) counter.addAndGet(delta); } simpleLock("task-getLong-" + key, 0, TimeUnit.SECONDS, 10, TimeUnit.SECONDS, () -> { try { int value = load.call(); counter.reset(); counter.addAndGet(value + delta); loadedSet.add(key); } catch (BizException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } }); return (int) counter.get(); } /** * 如果key不存在则执行load回调并返回执行结果,如果存在返回默认值(一般用于防止缓存穿透) * * @param cacheMap cacheMap * @param key cacheKey * @param load 回调方法 * @param defaultValue 默认值 * @param 数据类型 * @return */ public T runNx(Map cacheMap, String key, Callable load, T defaultValue) { Object obj = cacheMap.get(key); if (obj == null) { T result; try { result = load.call(); } catch (Exception e) { throw new RuntimeException(e); } // 防止缓存穿透 cacheMap.put(key, ONLY_KEY_OBJ); return result; } return defaultValue; } /** * 多线程锁 * * @param taskName 任务名字 * @param time 加锁时间值 * @param timeunit 加锁时间类型 * @param leaseTime 自动释放时间值 * @param leaseTimeunit 自动释放时间类型 * @param fn 加锁成功后执行的方法 * @param 类型 */ public void lock(String taskName, long time, @Nullable TimeUnit timeunit, long leaseTime, @Nullable TimeUnit leaseTimeunit, Callable fn) { try { IMap map = hazelcastInstance.getMap(CacheKey.DISTRIBUTED_LOCK); boolean lock = map.tryLock(taskName, time, timeunit, leaseTime, leaseTimeunit); if (lock) { String host = ((HazelcastInstanceProxy) hazelcastInstance).getOriginal().getLocalEndpoint().getAddress().getHost(); Integer logId = null; long runTime = 0; try { logId = scheduleLogService.save(ScheduleLog.start(taskName, host)); long startTime = System.currentTimeMillis(); LoggerUtil.data.info(taskName + "-开始"); T result = fn.call(); runTime = System.currentTimeMillis() - startTime; String resultJSON = JSONObject.toJSONString(result); LoggerUtil.data.info(taskName + "-结束:" + runTime, resultJSON); scheduleLogService.save(ScheduleLog.success(logId, result == null ? null : resultJSON)); } catch (Exception e) { LoggerUtil.error.error(taskName + "-异常:" + ExceptionUtils.getStackTrace(e)); scheduleLogService.save(ScheduleLog.error(logId, ExceptionUtils.getStackTrace(e))); } finally { if (runTime >= UNLOCK_SLEEP_TIME) { map.forceUnlock(taskName); } else if (lock) { TimeUnit.MILLISECONDS.sleep(UNLOCK_SLEEP_TIME - runTime); if (map.isLocked(taskName)) { map.unlock(taskName); } } } } } catch (Exception e) { LoggerUtil.error.error(taskName + "-异常:" + ExceptionUtils.getStackTrace(e)); } } /** * 多线程锁 * * @param taskName 任务名字 * @param time 加锁时间值 * @param timeunit 加锁时间类型 * @param leaseTime 自动释放时间值 * @param leaseTimeunit 自动释放时间类型 * @param fn 加锁成功后执行的方法 */ public void lock(String taskName, long time, @Nullable TimeUnit timeunit, long leaseTime, @Nullable TimeUnit leaseTimeunit, Runnable fn) { try { IMap map = hazelcastInstance.getMap(CacheKey.DISTRIBUTED_LOCK); long runTime = 0; boolean lock = map.tryLock(taskName, time, timeunit, leaseTime, leaseTimeunit); if (lock) { String host = ((HazelcastInstanceProxy) hazelcastInstance).getOriginal().getLocalEndpoint().getAddress().getHost(); Integer logId = null; try { logId = scheduleLogService.save(ScheduleLog.start(taskName, host)); long startTime = System.currentTimeMillis(); LoggerUtil.data.info(taskName + "-开始"); fn.run(); runTime = System.currentTimeMillis() - startTime; LoggerUtil.data.info(taskName + "-结束:" + runTime); scheduleLogService.save(ScheduleLog.success(logId, null)); } catch (Exception e) { LoggerUtil.error.error(taskName + "-异常:" + ExceptionUtils.getStackTrace(e)); scheduleLogService.save(ScheduleLog.error(logId, ExceptionUtils.getStackTrace(e))); } finally { if (runTime >= UNLOCK_SLEEP_TIME) { map.forceUnlock(taskName); } else if (lock) { TimeUnit.MILLISECONDS.sleep(UNLOCK_SLEEP_TIME - runTime); if (map.isLocked(taskName)) { map.unlock(taskName); } } } } } catch (Exception e) { LoggerUtil.error.error(taskName + "-异常:" + ExceptionUtils.getStackTrace(e)); } } public void simpleLock(String key, long time, @Nullable TimeUnit timeunit, long leaseTime, @Nullable TimeUnit leaseTimeunit, Runnable fn) { IMap map = hazelcastInstance.getMap(CacheKey.DISTRIBUTED_LOCK); boolean lock = false; try { lock = map.tryLock(key, time, timeunit, leaseTime, leaseTimeunit); if (lock) { fn.run(); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { if (lock && map.isLocked(key)) { map.unlock(key); } } } public Integer getLong(String cacheKey, int delta) { PNCounter counter = hazelcastInstance.getPNCounter(cacheKey); return (int) counter.addAndGet(delta); } public void clearCache(String mapName, List cacheKeys) { Map cacheMap = hazelcastInstance.getMap(mapName); cacheKeys.forEach(cacheMap::remove); } public void clearCache(String mapName, String cacheKey) { Map cacheMap = hazelcastInstance.getMap(mapName); cacheMap.remove(cacheKey); } }