CompletableFuture之控制时间
题目
有一个消息发送接口MessageService.send(String message),每次消息发送需要耗时2ms;
基于以上接口,实现一个批量发送接口MessageService.batchSend(List messages);
要求如下:
1)一次批量发送消息最大数量为100条
2)批量发送接口一次耗时不超过50ms。
3)要求返回消息发送是否成功的结果。
解决
思路:将list进行分割,然后遍历分割后的list,创建任务,然后等全部执行完
1 2 3 4 5
| <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1-jre</version> </dependency>
|
使用了guava进行list的分割
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.fang.other;
import com.google.common.collect.Lists;
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;
public class Test { public static void main(String[] args) { ArrayList<String> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { list.add(String.valueOf(i)); }
long start = System.currentTimeMillis(); batchSend(list); long end = System.currentTimeMillis(); System.out.println("方法耗时:" + (end - start)); }
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
public static boolean batchSend(List<String> messages) { List<List<String>> partition = Lists.partition(messages, 20); List<CompletableFuture> futures = new ArrayList<>(); for (List<String> one : partition) { CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { for (String res : one) { if (!send(res)) return false; } return true; },executor); futures.add(future);
} CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); try { allOf.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } return true; }
public static boolean send(String message) { try { Thread.sleep(2); return true; } catch (InterruptedException e) { throw new RuntimeException(e); } } }
|
一致性hash
首先定义一个节点类,实现数据节点的功能,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class ConsistentHash { private final TreeMap<Integer, Node> hashRing = new TreeMap<>(); public List<Node> nodeList = new ArrayList<>();
public void addNode(String ip) { Objects.requireNonNull(ip); Node node = new Node(ip); nodeList.add(node); for (Integer virtualNodeHash : node.getVirtualNodeHashes()) { hashRing.put(virtualNodeHash, node); System.out.println("虚拟节点[" + node + "] hash:" + virtualNodeHash + ",被添加"); } }
public void removeNode(Node node){ nodeList.remove(node); }
public Object get(Object key) { Node node = findMatchNode(key); System.out.println("获取到节点:" + node.getIp()); return node.getCacheItem(key); }
public void put(Object key, Object value) { Node node = findMatchNode(key);
node.addCacheItem(key, value); }
public void evict(Object key) { findMatchNode(key).removeCacheItem(key); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
|
private Node findMatchNode(Object key) { Map.Entry<Integer, Node> entry = hashRing.ceilingEntry(HashUtils.hashcode(key)); if (entry == null) { entry = hashRing.firstEntry(); } return entry.getValue(); }
|
}
如上所示,通过TreeMap的ceilingEntry() 方法,实现顺时针查找下一个的服务器节点的功能。
哈希计算方法比较常见,网上也有很多计算hash 值的函数。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class HashUtils {
public static int hashcode(Object obj) { final int p = 16777619; int hash = (int) 2166136261L; String str = obj.toString(); for (int i = 0; i < str.length(); i++) hash = (hash ^ str.charAt(i)) * p; hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5;
if (hash < 0) hash = Math.abs(hash); return hash; } }
|
一致性哈希算法实现后,接下来添加一个测试类,验证此算法时候正常。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class ConsistentHashTest { public static final int NODE_SIZE = 10; public static final int STRING_COUNT = 100 * 100; private static ConsistentHash consistentHash = new ConsistentHash(); private static List<String> sList = new ArrayList<>(); public static void main(String[] args) { for (int i = 0; i < NODE_SIZE; i++) { String ip = new StringBuilder("10.2.1.").append(i) .toString(); consistentHash.addNode(ip); }
for (int i = 0; i < STRING_COUNT; i++) { sList.add(RandomStringUtils.randomAlphanumeric(10)); }
for (String s : sList) { consistentHash.put(s, s); }
for(int i = 0 ; i < 10 ; i ++) { int index = RandomUtils.nextInt(0, STRING_COUNT); String key = sList.get(index); String cache = (String) consistentHash.get(key); System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache)); }
for (Node node : consistentHash.nodeList){ System.out.println(node); }
consistentHash.addNode("10.2.1.110"); for(int i = 0 ; i < 10 ; i ++) { int index = RandomUtils.nextInt(0, STRING_COUNT); String key = sList.get(index); String cache = (String) consistentHash.get(key); System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache)); }
for (Node node : consistentHash.nodeList){ System.out.println(node); } } }
|
运行此测试,输出结果如下所示:
