Redis 批量操作 pipeline 模式

程序员大白

共 537字,需浏览 2分钟

 ·

2021-12-12 23:28

点击上方“程序员大白”,选择“星标”公众号

重磅干货,第一时间送达


业务场景

       项目中场景需要get一批key的value,因为redis的get操作(不单单是get命令)是阻塞的,如果循环取值的话,就算是内网,耗时也是巨大的。所以想到了redis的pipeline命令。


pipeline简介

  • 非pipeline:client一个请求,redis server一个响应,期间client阻塞

  • Pipeline:redis的管道命令,允许client将多个请求依次发给服务器(redis的客户端,如jedisCluster,lettuce等都实现了对pipeline的封装),过程中而不需要等待请求的回复,在最后再一并读取结果即可。


单机版

单机版比较简单,批量获取

1//换成真实的redis实例
2Jedis jedis = new Jedis();
3//获取管道
4Pipeline p = jedis.pipelined();
5for (int i = 0; i < 10000; i++) {
6    p.get("key_" + i);
7}
8//获取结果
9List<Object> results = p.syncAndReturnAll();

批量插入

1String key = "key";
2Jedis jedis = new Jedis();
3Pipeline p = jedis.pipelined();
4List<String> cacheData = .... //要插入的数据列表
5for(String data: cacheData ){
6    p.hset(key, data);
7}
8p.sync();
9jedis.close();


集群版

实际上遇到的问题是,项目上所用到的Redis是集群,初始化的时候使用的类是 JedisCluster 而不是 Jedis。去查了 JedisCluster 的文档,并没有发现提供有像 Jedis 一样的获取 Pipeline对象的 pipelined() 方法。解决方案:

Redis 集群规范有说: Redis 集群的键空间被分割为 16384 个槽(slot), 集群的最大节点数量也是 16384 个。每个主节点都负责处理 16384 个哈希槽的其中一部分。当我们说一个集群处于“稳定”(stable)状态时, 指的是集群没有在执行重配置(reconfiguration)操作, 每个哈希槽都只由一个节点进行处理。所以可以根据要插入的 key 知道这个 key 所对应的槽的号码,再通过这个槽的号码从集群中找到对应 Jedis。具体实现如下:

1//初始化得到了jedis cluster, 如何获取HostAndPort集合代码就不写了
2Set nodes = .....
3JedisCluster jedisCluster = new JedisCluster(nodes);
4
5Map<String, JedisPool> nodeMap = jedisCluster.getClusterNodes();
6String anyHost = nodeMap.keySet().iterator().next();
7
8//getSlotHostMap方法在下面有
9TreeMapString
> slotHostMap = getSlotHostMap(anyHost); 

 

 1private static TreeMapString> getSlotHostMap(String anyHostAndPortStr) {
2        TreeMapString> tree = new TreeMapString>();
3        String parts[] = anyHostAndPortStr.split(":");
4        HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
5        try{
6            Jedis jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
7            List<Object> list = jedis.clusterSlots();
8            for (Object object : list) {
9                List<Object> list1 = (List<Object>) object;
10                List<Object> master = (List<Object>) list1.get(2);
11                String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
12                tree.put((Long) list1.get(0), hostAndPort);
13                tree.put((Long) list1.get(1), hostAndPort);
14            }
15            jedis.close();
16        }catch(Exception e){
17
18        }
19        return tree;
20}

 上面这几步可以在初始化的时候就完成。不需要每次都调用, 把nodeMap和slotHostMap都定义为静态变量。

1//获取槽号
2int slot = JedisClusterCRC16.getSlot(key); 
3//获取到对应的Jedis对象
4Map.EntryString
> entry = slotHostMap.lowerEntry(Long.valueOf(slot));
5Jedis jedis = nodeMap.get(entry.getValue()).getResource();

建议上面这步操作可以封装成一个静态方法。比如命名为 public static Jedis getJedisByKey(String key) 之类的。意思就是在集群中, 通过key获取到这个key所对应的Jedis对象。这样再通过上面的 jedis.pipelined(); 来就可以进行批量插入了。以下是一个比较完整的封装

  1import redis.clients.jedis.*;
3import redis.clients.jedis.exceptions.JedisMovedDataException;
4import redis.clients.jedis.exceptions.JedisRedirectionException;
5import redis.clients.util.JedisClusterCRC16;
6import redis.clients.util.SafeEncoder;
7
8import java.io.Closeable;
9import java.lang.reflect.Field;
10import java.util.*;
11import java.util.function.BiConsumer;
12
14public class JedisClusterPipeline extends PipelineBase implements Closeable {
15
16    /**
17     * 用于获取 JedisClusterInfoCache
18     */

19    private JedisSlotBasedConnectionHandler connectionHandler;
20    /**
21     * 根据hash值获取连接
22     */

23    private JedisClusterInfoCache clusterInfoCache;
24
25    /**
26     * 也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
27     * JedisCluster继承于BinaryJedisCluster
28     * 在BinaryJedisCluster,connectionHandler属性protected修饰的,所以需要反射
29     *
30     *
31     * 而 JedisClusterInfoCache 属性在JedisClusterConnectionHandler中,但是这个类是抽象类,
32     * 但它有一个实现类JedisSlotBasedConnectionHandler
33     */

34    private static final Field FIELD_CONNECTION_HANDLER;
35    private static final Field FIELD_CACHE;
36    static {
37        FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
38        FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
39    }
40
41    /**
42     * 根据顺序存储每个命令对应的Client
43     */

44    private Queue clients = new LinkedList<>();
45    /**
46     * 用于缓存连接
47     * 一次pipeline过程中使用到的jedis缓存
48     */

49    private Map jedisMap = new HashMap<>();
50    /**
51     * 是否有数据在缓存区
52     */

53    private boolean hasDataInBuf = false;
54
55    /**
56     * 根据jedisCluster实例生成对应的JedisClusterPipeline
57     * 通过此方式获取pipeline进行操作的话必须调用close()关闭管道
58     * 调用本类里pipelineXX方法则不用close(),但建议最好还是在finally里调用一下close()
59     * @param
60     * @return
61     */

62    public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
63        JedisClusterPipeline pipeline = new JedisClusterPipeline();
64        pipeline.setJedisCluster(jedisCluster);
65        return pipeline;
66    }
67
68    public JedisClusterPipeline() {
69    }
70
71    public void setJedisCluster(JedisCluster jedis) {
72        connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
73        clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
74    }
75
76    /**
77     * 刷新集群信息,当集群信息发生变更时调用
78     * @param
79     * @return
80     */

81    public void refreshCluster() {
82        connectionHandler.renewSlotCache();
83    }
84
85    /**
86     * 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
87     */

88    public void sync() {
89        innerSync(null);
90    }
91
92    /**
93     * 同步读取所有数据 并按命令顺序返回一个列表
94     *
95     * @return 按照命令的顺序返回所有的数据
96     */

97    public List syncAndReturnAll() {
98        List responseList = new ArrayList<>();
99
100        innerSync(responseList);
101
102        return responseList;
103    }
104
105    @Override
106    public void close() {
107        clean();
108        clients.clear();
109        for (Jedis jedis : jedisMap.values()) {
110            if (hasDataInBuf) {
111                flushCachedData(jedis);
112            }
113            jedis.close();
114        }
115        jedisMap.clear();
116        hasDataInBuf = false;
117    }
118
119    private void flushCachedData(Jedis jedis) {
120        try {
121            jedis.getClient().getAll();
122        } catch (RuntimeException ex) {
123        }
124    }
125
126    @Override
127    protected Client getClient(String key) {
128        byte[] bKey = SafeEncoder.encode(key);
129        return getClient(bKey);
130    }
131
132    @Override
133    protected Client getClient(byte[] key) {
134        Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
135        Client client = jedis.getClient();
136        clients.add(client);
137        return client;
138    }
139
140    private Jedis getJedis(int slot) {
141        JedisPool pool = clusterInfoCache.getSlotPool(slot);
142        // 根据pool从缓存中获取Jedis
143        Jedis jedis = jedisMap.get(pool);
144        if (null == jedis) {
145            jedis = pool.getResource();
146            jedisMap.put(pool, jedis);
147        }
148        hasDataInBuf = true;
149        return jedis;
150    }
151
152    public static void pipelineSetEx(String[] keys, String[] values, int[] exps,JedisCluster jedisCluster) {
153        operate(new Command() {
154            @Override
155            public List execute() {
156                JedisClusterPipeline p = pipelined(jedisCluster);
157                for (int i = 0, len = keys.length; i < len; i++) {
158                    p.setex(keys[i], exps[i], values[i]);
159                }
160                return p.syncAndReturnAll();
161            }
162        });
163    }
164
165    public static List> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) {
166        return operate(new Command() {
167            @Override
168            public List execute() {
169                JedisClusterPipeline p = pipelined(jedisCluster);
170                for (int i = 0, len = keys.length; i < len; i++) {
171                    p.hgetAll(keys[i]);
172                }
173                return p.syncAndReturnAll();
174            }
175        });
176    }
177
178    public static List pipelineSismember(String[] keys, String members,JedisCluster jedisCluster) {
179        return operate(new Command() {
180            @Override
181            public List execute() {
182                JedisClusterPipeline p = pipelined(jedisCluster);
183                for (int i = 0, len = keys.length; i < len; i++) {
184                    p.sismember(keys[i], members);
185                }
186                return p.syncAndReturnAll();
187            }
188        });
189    }
190
191    public static  List pipeline(BiConsumer function, O obj,JedisCluster jedisCluster) {
192        return operate(new Command() {
193            @Override
194            public List execute() {
195                JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);
196                function.accept(obj, jcp);
197                return jcp.syncAndReturnAll();
198            }
199        });
200    }
201
202    private void innerSync(List formatted) {
203        HashSet clientSet = new HashSet<>();
204        try {
205            for (Client client : clients) {
206                // 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。
207                // 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了
208                Object data = generateResponse(client.getOne()).get();
209                if (null != formatted) {
210                    formatted.add(data);
211                }
212                // size相同说明所有的client都已经添加,就不用再调用add方法了
213                if (clientSet.size() != jedisMap.size()) {
214                    clientSet.add(client);
215                }
216            }
217        } catch (JedisRedirectionException jre) {
218            if (jre instanceof JedisMovedDataException) {
219                // if MOVED redirection occurred, rebuilds cluster's slot cache,
220                // recommended by Redis cluster specification
221                refreshCluster();
222            }
223
224            throw jre;
225        } finally {
226            if (clientSet.size() != jedisMap.size()) {
227                // 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
228                for (Jedis jedis : jedisMap.values()) {
229                    if (clientSet.contains(jedis.getClient())) {
230                        continue;
231                    }
232                    flushCachedData(jedis);
233                }
234            }
235            hasDataInBuf = false;
236            close();
237        }
238    }
239
240    private static Field getField(Class cls, String fieldName) {
241        try {
242            Field field = cls.getDeclaredField(fieldName);
243            field.setAccessible(true);
244            return field;
245        } catch (NoSuchFieldException | SecurityException e) {
246            throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
247        }
248    }
249
250    @SuppressWarnings({"unchecked" })
251    private static  getValue(Object obj, Field field) {
252        try {
253            return (T)field.get(obj);
254        } catch (IllegalArgumentException | IllegalAccessException e) {
257            throw new RuntimeException(e);
258        }
259    }
260
261    private static  operate(Command command) {
262        try  {
263            return command.execute();
264        } catch (Exception e) {
266            throw new RuntimeException(e);
267        }
268    }
269
270    interface Command {
271        /**
272         * 具体执行命令
273         *
274         * @param 
275         * @return
276         */

277         execute();
278    }
279}

使用例子

 1    public Object testPipelineOperate() {
2        //        String[] keys = {"dylan1","dylan2"};
3        //        String[] values = {"dylan1-v1","dylan2-v2"};
4        //        int[] exps = {100,200};
5        //        JedisClusterPipeline.pipelineSetEx(keys, values, exps, jedisCluster);
6        long start = System.currentTimeMillis();
7
8        List<String> keyList = new ArrayList<>();
9        for (int i = 0; i < 1000; i++) {
10            keyList.add(i + "");
11        }
12        //        List pipeline = JedisClusterPipeline.pipeline(this::getValue, keyList, jedisCluster);
13        //        List pipeline = JedisClusterPipeline.pipeline(this::getHashValue, keyList, jedisCluster);
14        String[] keys = {"dylan-test1""dylan-test2"};
15
16        List<Map<StringString>> all = JedisClusterPipeline.pipelineHgetAll(keys, jedisCluster);
17        long end = System.currentTimeMillis();
18        System.out.println("testPipelineOperate cost:" + (end-start));
19
20        return Response.success(all);
21    }


13个你一定要知道的PyTorch特性

解读:为什么要做特征归一化/标准化?

一文搞懂 PyTorch 内部机制

张一鸣:每个逆袭的年轻人,都具备的底层能力




西[]


浏览 226
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报