??
?
背景:
最近系統內緩存CPU使用率一直報警,超過設置的70%報警閥值,針對此場景,需要對應解決緩存是否有大key使用問題,掃描緩存集群的大key,針對每個key做優化處理。
以下是掃描出來的大key,此處只放置了有效關鍵信息。
??
圖1
大key介紹:
想要解決大key,首先我們得知道什么定義為大key。
什么是大KEY:
大key 并不是指 key 的值很大,而是 key 對應的 value 很大(非常占內存)。此處為中間件給出的定義:
?單個String類型的Key大小達到20KB并且OPS高
?單個String達到100KB
?集合類型的Key總大小達到1MB
?集合類型的Key中元素超過5000個
大KEY帶來的影響:
知道了大key的定義,那么我們也得知道大key的帶來的影響:
?客戶端超時阻塞。 Redis 執行命令是單線程處理,然后在大 key處理時會比較耗時,那么就會發生阻塞 ,期間就會各種業務超時出現。
?引發網絡阻塞。每次獲取大 key 產生的網絡流量較大,如果一個 key 的大小是 1 MB,每秒訪問量為 1000,那么每秒會產生 1000MB 的流量,這對于服務器來說是災難性的。
?阻塞工作線程。如果使用 del 刪除大 key 時,會阻塞工作線程,無法處理后續的命令。
?內存分布不均。集群各分片內存使用不均。某個分片占用內存較高或OOM,發送緩存區增大等,導致該分片其他Key被逐出,同時也會造成其他分片的資源浪費。
大KEY解決手段:
1、歷史key未使用
場景描述:
針對這種key場景,其實存在著歷史原因,可能是伴隨著某個業務下線或者不使用,往往對應實現的緩存操作代碼會刪除,但是對于緩存數據往往不會做任何處理,久而久之,這種臟數據會一直堆積,占用著資源。那么如果確定已經無使用,并且可以確認有持久化數據(如mysql、es等)備份的話,可以直接將對應key刪除。
實例經驗:
如圖1上面的元素個數488649,其實整個系統查看了下,沒有使用的地方,最近也沒有訪問,相信也是因為一直沒有用到, 否則系統內一旦用了這個key來操作hgetall、smembers等,那么緩存服務應該就會不可用了。
2、元素數過多
場景描述:
針對于Set、HASH這種場景,如果元素數量超過5000就視為大的key,以上面圖1為例,可以看到元素個數有的甚至達到了1萬以上。針對這種的如果對應value值不大,我們可以采取平鋪的形式,
實例經驗:
比如系統內歷史的設計是存儲下每個品牌對應的名稱,那么就設置了統一的key,然后不同的品牌id作為fild,操作了hSet和hGet來存儲獲取數據,降低查詢外圍服務的頻率。但是隨著品牌數量的增長,導致元素逐步增多,元素個數就超過了大key的預設值了。這種根據場景,我們其實存儲本身只有一個品牌名稱,那么我們就針對于品牌id對應加上一個統一前綴作為唯一key,采用平鋪方式緩存對應數據即可。那么針對這種數據的替換,我這里也總結了下具體要實現的步驟:
修改代碼查詢和賦值邏輯:
?把原始的hGet的邏輯修改為get獲?。?/p>
?把原始hSet的邏輯修改為set賦值。
歷史數據刷新到新緩存key:
為了避免上線之后出現緩存雪崩,因為替換了新的key,我們需要通過現有的HASH的數據刷新到新的緩存中,所以需要歷史數據處理。
通過hGetAll獲取所以元素數據
循環緩存元素數據操作存儲新的緩存key和value。
public String refreshHistoryData(){ try { String key = "historyKey"; Map redisInfoMap= redisUtils.hGetAll(key); if (redisInfoMap.isEmpty()){ return "查詢緩存無數據"; } for (Map.Entry entry : redisInfoMap.entrySet()) { String redisVal = entry.getValue(); String filedKey = entry.getKey(); String newDataRedisKey = "newDataKey"+filedKey; redisUtils.set(newDataRedisKey,redisVal); } return "success"; }catch (Exception e){ LOG.error("refreshHistoryData 異常:",e); } return "failed"; }
注意:這里一定要先刷歷史數據,再上線代碼業務邏輯的修改。防止引發緩存雪崩
3、大對象轉換存儲形式
場景描述:
復雜的大對象可以嘗試將對象分拆成幾個key-value, 使用mGet和mSet操作對應值或者pipeline的形式,最后拼裝成需要返回的大對象。這樣意義在于可以分散單次操作的壓力,將操作壓力平攤到多個redis實例中,降低對單個redis的IO影響;
實例經驗:
這里以系統內訂單對象為例:訂單對象Order基礎屬性有幾十個,如訂單號、金額、時間、類型等,除此之外還要包含訂單下的商品OrderSub、預售信息PresaleOrder、發票信息OrderInvoice、訂單時效OrderPremiseInfo、訂單軌跡OrderTrackInfo、訂單詳細費用OrderFee等信息。
那么對于每個訂單相關信息,我們可以設置為單獨的key,把訂單信息和幾個相關的關聯數據每個按照單獨key存儲,接著通過mGet方式獲取每個信息之后,最后封裝成整體Order對象。下面僅展示關鍵偽代碼以mSet和mGet實現:
緩存定義:
public enum CacheKeyConstant { /** * 訂單基礎緩存key */ REDIS_ORDER_BASE_INFO("ORDER_BASE_INFO"), /** * 訂單商品緩存key */ ORDER_SUB_INFO("ORDER_SUB_INFO"), /** * 訂單預售信息緩存key */ ORDER_PRESALE_INFO("ORDER_PRESALE_INFO"), /** * 訂單履約信息緩存key */ ORDER_PREMISE_INFO("ORDER_PREMISE_INFO"), /** * 訂單發票信息緩存key */ ORDER_INVOICE_INFO("ORDER_INVOICE_INFO"), /** * 訂單軌跡信息緩存key */ ORDER_TRACK_INFO("ORDER_TRACK_INFO"), /** * 訂單詳細費用信息緩存key */ ORDER_FEE_INFO("ORDER_FEE_INFO"), ; /** * 前綴 */ private String prefix; /** * 項目統一前綴 */ public static final String COMMON_PREFIX = "XXX"; CacheKeyConstant(String prefix){ this.prefix = prefix; } public String getPrefix(String subKey) { if(StringUtil.isNotEmpty(subKey)){ return COMMON_PREFIX + prefix + "_" + subKey; } return COMMON_PREFIX + prefix; } public String getPrefix() { return COMMON_PREFIX + prefix; } }
緩存存儲:
/** * @description 刷新訂單到緩存 * @param order 訂單信息 */ public boolean refreshOrderToCache(Order order){ if(order == null || order.getOrderId() == null){ return ; } String orderId = order.getOrderId().toString(); //設置存儲緩存數據 Map cacheOrderMap = new HashMap?>(16); cacheOrderMap.put(CacheKeyConstant.ORDER_BASE_INFO.getPrefix(orderId), JSON.toJSONString(buildBaseOrderVo(order))); cacheOrderMap.put(CacheKeyConstant.ORDER_SUB_INFO.getPrefix(orderId), JSON.toJSONString(order.getCustomerOrderSubs())); cacheOrderMap.put(CacheKeyConstant.ORDER_PRESALE_INFO.getPrefix(orderId), JSON.toJSONString(order.getPresaleOrderData())); cacheOrderMap.put(CacheKeyConstant.ORDER_INVOICE_INFO.getPrefix(orderId), JSON.toJSONString(order.getOrderInvoice())); cacheOrderMap.put(CacheKeyConstant.ORDER_TRACK_INFO.getPrefix(orderId), JSON.toJSONString(order.getOrderTrackInfo())); cacheOrderMap.put(CacheKeyConstant.ORDER_PREMISE_INFO.getPrefix(orderId), JSON.toJSONString( order.getPresaleOrderData())); cacheOrderMap.put(CacheKeyConstant.ORDER_FEE_INFO.getPrefix(orderId), JSON.toJSONString(order.getOrderFeeVo())); superRedisUtils.mSetString(cacheOrderMap); }
緩存獲取:
/** * @description 通過訂單號獲取緩存數據 * @param orderId 訂單號 * @return Order 訂單實體信息 */ public Order getOrderFromCache(String orderId){ if(StringUtils.isBlank(orderId)){ return null; } //定義查詢緩存集合key List queryOrderKey = Arrays.asList(CacheKeyConstant.ORDER_BASE_INFO.getPrefix(orderId),CacheKeyConstant.ORDER_SUB_INFO.getPrefix(orderId), CacheKeyConstant.ORDER_PRESALE_INFO.getPrefix(orderId),CacheKeyConstant.ORDER_INVOICE_INFO.getPrefix(orderId),CacheKeyConstant.ORDER_TRACK_INFO.getPrefix(orderId), CacheKeyConstant.ORDER_PREMISE_INFO.getPrefix(orderId),CacheKeyConstant.ORDER_FEE_INFO.getPrefix(orderId)); //查詢結果 List result = redisUtils.mGet(queryOrderKey); //基礎信息 if(CollectionUtils.isEmpty(result)){ return null; } String[] resultInfo = result.toArray(new String[0]); //基礎信息 if(StringUtils.isBlank(resultInfo[0])){ return null; } BaseOrderVo baseOrderVo = JSON.parseObject(resultInfo[0],BaseOrderVo.class); Order order = coverBaseOrderVoToOrder(baseOrderVo); //訂單商品 if(StringUtils.isNotBlank(resultInfo[1])){ List orderSubs =JSON.parseObject(result.get(1), new TypeReference>(){}); order.setCustomerOrderSubs(orderSubs); } //訂單預售 if(StringUtils.isNotBlank(resultInfo[2])){ PresaleOrderData presaleOrderData = JSON.parseObject(resultInfo[2],PresaleOrderData.class); order.setPresaleOrderData(presaleOrderData); } //訂單發票 if(StringUtils.isNotBlank(resultInfo[3])){ OrderInvoice orderInvoice = JSON.parseObject(resultInfo[3],OrderInvoice.class); order.setOrderInvoice(orderInvoice); } //訂單軌跡 if(StringUtils.isNotBlank(resultInfo[5])){ OrderTrackInfo orderTrackInfo = JSON.parseObject(resultInfo[5],OrderTrackInfo.class); order.setOrderTrackInfo(orderTrackInfo); } //訂單履約信息 if(StringUtils.isNotBlank(resultInfo[6])){ List orderPremiseInfos =JSON.parseObject(result.get(6), new TypeReference>(){}); order.setPremiseInfos(orderPremiseInfos); } //訂單費用明細信息 if(StringUtils.isNotBlank(resultInfo[7])){ OrderFeeVo orderFeeVo = JSON.parseObject(resultInfo[7],OrderFeeVo.class); order.setOrderFeeVo(orderFeeVo); } return order; }
注意:獲取緩存的結果跟傳入的key的順序保持對應即可。
緩存util方法封裝:
/** * * @description 同時將多個 key-value (域-值)對設置到緩存中。 * @param mappings 需要插入的數據信息 */ public void mSetString(Map mappings) { CallerInfo callerInfo = Ump.methodReg(UmpKeyConstants.REDIS.REDIS_STATUS_READ_MSET); try { redisClient.getClientInstance().mSetString(mappings); } catch (Exception e) { Ump.funcError(callerInfo); }finally { Ump.methodRegEnd(callerInfo); } } /** * * @description 同時將多個key的結果返回。 * @param queryKeys 查詢的緩存key集合 */ public List mGet(List queryKeys) { CallerInfo callerInfo = Ump.methodReg(UmpKeyConstants.REDIS.REDIS_STATUS_READ_MGET); try { return redisClient.getClientInstance().mGet(queryKeys.toArray(new String[0])); } catch (Exception e) { Ump.funcError(callerInfo); }finally { Ump.methodRegEnd(callerInfo); } return new ArrayList(queryKeys.size()); }
這里附上通過pipeline的util封裝,可參考。
/** * @description pipeline放松查詢數據 * @param redisKeyList * @return java.util.List */ public List getValueByPipeline(List redisKeyList) { if(CollectionUtils.isEmpty(redisKeyList)){ return null; } List resultInfo = new ArrayList?>(redisKeyList); CallerInfo callerInfo = Ump.methodReg(UmpKeyConstants.REDIS.REDIS_STATUS_READ_GET); try { PipelineClient pipelineClient = redisClient.getClientInstance().pipelineClient(); //添加批量查詢任務 List futures = new ArrayList?>(); redisKeyList.forEach(redisKey -> { futures.add(pipelineClient.get(redisKey.getBytes())); }); //處理查詢結果 pipelineClient.flush(); //可以等待future的返回結果,來判斷命令是否成功。 for (JimFuture future : futures) { resultInfo.add(new String((byte[])future.get())); } } catch (Exception e) { log.error("getValueByPipeline error:",e); Ump.funcError(callerInfo); return new ArrayList?>(redisKeyList.size()); }finally { Ump.methodRegEnd(callerInfo); } return resultInfo; }
注意:Pipeline不建議用來設置緩存值,因為本身不是原子性的操作。
4、壓縮存儲數據
壓縮方法結果:
單個元素時:
??
壓縮方法 | 壓縮前大小Byte | 壓縮后大小Byte | 壓縮耗時 | 解壓耗時 | 壓縮解壓后比對結果 |
DefaultOutputStream | 446(0.43kb) | 254 (0.25kb) | 1ms | 0ms | 相同 |
GzipOutputStream | 446(0.43kb) | 266 (0.25kbM) | 1ms | 1ms | 相同 |
ZlibCompress | 446(0.43kb) | 254 (0.25kb) | 1ms | 0ms | 相同 |
四百個元素集合:
??
壓縮方法 | 壓縮前大小Byte | 壓縮后大小Byte | 壓縮耗時 | 解壓耗時 | 壓縮解壓后比對結果 |
DefaultOutputStream | 6732(6.57kb) | 190 (0.18kb) | 2ms | 0ms | 相同 |
GzipOutputStream | 6732(6.57kb) | 202 (0.19kb) | 1ms | 1ms | 相同 |
ZlibCompress | 6732(6.57kb) | 190 (0.18kb) | 1ms | 0ms | 相同 |
四萬個元素集合時:
??
壓縮方法 | 壓縮前大小Byte | 壓縮后大小Byte | 壓縮耗時 | 解壓耗時 | 壓縮解壓后比對結果 |
DefaultOutputStream | 640340(625kb) | 1732 (1.69kb) | 37ms | 2ms | 相同 |
GzipOutputStream | 640340(625kb) | 1744 (1.70kb) | 11ms | 3ms | 相同 |
ZlibCompress | 640340(625kb) | 1732 (1.69kb) | 69ms | 2ms | 相同 |
壓縮代碼樣例
DefaultOutputStream
public static byte[] compressToByteArray(String text) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Deflater deflater = new Deflater(); DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(outputStream, deflater); deflaterOutputStream.write(text.getBytes()); deflaterOutputStream.close(); return outputStream.toByteArray(); }
public static String decompressFromByteArray(byte[] bytes) throws IOException { ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); Inflater inflater = new Inflater(); InflaterInputStream inflaterInputStream = new InflaterInputStream(inputStream, inflater); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int length; while ((length = inflaterInputStream.read(buffer)) != -1) { outputStream.write(buffer, 0, length); } inflaterInputStream.close(); outputStream.close(); byte[] decompressedData = outputStream.toByteArray(); return new String(decompressedData); }
GZIPOutputStream
public static byte[] compressGzip(String str) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); GZIPOutputStream gzipOutputStream = null; try { gzipOutputStream = new GZIPOutputStream(outputStream); } catch (IOException e) { throw new RuntimeException(e); } try { gzipOutputStream.write(str.getBytes("UTF-8")); } catch (IOException e) { throw new RuntimeException(e); }finally { try { gzipOutputStream.close(); } catch (IOException e) { throw new RuntimeException(e); } } return outputStream.toByteArray(); }
public static String decompressGzip(byte[] compressed) throws IOException { ByteArrayInputStream inputStream = new ByteArrayInputStream(compressed); GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int length; while ((length = gzipInputStream.read(buffer)) > 0) { outputStream.write(buffer, 0, length); } gzipInputStream.close(); outputStream.close(); return outputStream.toString("UTF-8"); }
ZlibCompress
public byte[] zlibCompress(String message) throws Exception { String chatacter = "UTF-8"; byte[] input = message.getBytes(chatacter); BigDecimal bigDecimal = BigDecimal.valueOf(0.25f); BigDecimal length = BigDecimal.valueOf(input.length); byte[] output = new byte[input.length + 10 + new Double(Math.ceil(Double.parseDouble(bigDecimal.multiply(length).toString()))).intValue()]; Deflater compresser = new Deflater(); compresser.setInput(input); compresser.finish(); int compressedDataLength = compresser.deflate(output); compresser.end(); return Arrays.copyOf(output, compressedDataLength); }
public static String zlibInfCompress(byte[] data) { String s = null; Inflater decompresser = new Inflater(); decompresser.reset(); decompresser.setInput(data); ByteArrayOutputStream o = new ByteArrayOutputStream(data.length); try { byte[] buf = new byte[1024]; while (!decompresser.finished()) { int i = decompresser.inflate(buf); o.write(buf, 0, i); } s = o.toString("UTF-8"); } catch (Exception e) { e.printStackTrace(); } finally { try { o.close(); } catch (IOException e) { e.printStackTrace(); } } decompresser.end(); return s; }
可以看到壓縮效率比較好,壓縮效率可以從幾百kb壓縮到幾kb內;當然也是看具體場景。不過這里就是最好是避免調用量大的場景使用,畢竟解壓和壓縮數據量大會比較耗費cpu性能。如果是黃金鏈路使用,還需要具體配合壓測,對比前后接口性能。
5、替換存儲方案
如果數據量龐大,那么其實本身是不是就不太適合redis這種緩存存儲了??梢钥紤]es或者mongo這種文檔式存儲結構,存儲大的數據格式。
總結:
redis緩存的使用是一個支持業務和功能高并發的很好的使用方案,但是隨著使用場景的多樣性以及數據的增加,可能逐漸的會出現大key,日常使用中都可以注意以下幾點:
1.分而治之:如果需要存儲大量的數據,避免直接放到緩存中??梢詫⑵洳鸱殖啥鄠€小的value。就像是咱們日常吃飯,盛到碗里,一口一口的吃,俗話說的好呀:“細嚼慢咽”。
2.避免使用不必要的數據結構。例如,如果只需要存儲一個字符串結構的數據,就不要過度設計,使用Hash或者List等數據結構。
3.定期清理過期的key。如果Redis中存在大量的過期key,就會導致Redis的性能下降,或者場景非必要以緩存來持久存儲的,可以添加過期時間,定時清理過期的key,就像是家中的日常垃圾類似,定期的清潔和打掃,居住起來咱們才會更加舒服和方便。
4.對象壓縮。將大的數據壓縮成更小的數據,也是一種好的解決方案,不過要注意壓縮和解壓的頻率,畢竟是比較耗費cpu的。
以上是我根據現有實際場景總結出的一些解決手段,記錄了這些大key的優化經驗,希望可以在日常場景中幫助到大家。大家有其他的好的經驗,也可以分享出來。
審核編輯 黃宇
-
緩存
+關注
關注
1文章
245瀏覽量
27040 -
key
+關注
關注
0文章
53瀏覽量
13028 -
Redis
+關注
關注
0文章
384瀏覽量
11316
發布評論請先 登錄
基于javaPoet的緩存key優化實踐

你總得知道你為什么要用Cortex-M
硬盤緩存有什么用
區塊鏈的事實你知道哪一些
CPU緩存是什么意思_CPU緩存有什么作用
你知道開關電源布局以及印制板布線的一些原則嗎

評論