java - Redis 在运行时获取命令不可预测的结果

标签 java redis

我有一个小程序,它通过加密交换建立 web 套接字连接,接收数据并使用 set Redis 命令保存它。

代码

//get Redis connection
    RedisAsyncCommands<String, String> redis = TRedis.getRedis();
    String symbol = "AGIETH";
    Session session = null;
    try {
        //Open websocket connection. 
        session = (new 
  BinanceApi()).websocketTrades(BinanceSymbol.valueOf(symbol), new BinanceWebSocketAdapterAggTrades() {
            @Override
            public void onMessage(BinanceEventAggTrade message) {
                double closeOrderBuy = 0;
                double closeOrderSell = 0;
                //check if we  saved order information before and if yes get data from Redis
                try {
                    if(redis.get(symbol+"Buy").get()!=null )
                    {
                        closeOrderBuy = Double.valueOf(redis.get(symbol+"Buy").get());
                    }
                    if( redis.get(symbol+"Sell").get()!=null)
                    {
                        closeOrderSell = Double.valueOf(redis.get(symbol+"Sell").get());
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                // get current value from exchange
                double currentCloseOrder = message.getPrice().multiply(message.getQuantity()).doubleValue();

                // rewrite data in Redis
                if(message.isMaker()) {
                    closeOrderBuy = currentCloseOrder + closeOrderBuy;
                    redis.set(symbol + "Buy",String.valueOf(closeOrderBuy));
                }
                else {
                    closeOrderSell = currentCloseOrder + closeOrderSell;
                    redis.set(symbol + "Sell",String.valueOf(closeOrderSell));
                }
            }
        });
    } catch (BinanceApiException e) {
        e.printStackTrace();
    }
    try {
        Thread.sleep(10000);
        session.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
//check what we write
    try {
        System.out.println(symbol + redis.get(symbol + "Buy").get() + "  " + redis.get(symbol + "Sell").get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

这是 AGIETH 对的控制台输出的一部分:

18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandEncoder - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]  
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] Received: 5 bytes, 1 commands in the stack  
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decode AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]  
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decoded AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true  
***AGIETH 0.045027  null***

AGIBTC 对的输出

13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandEncoder - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] Received: 25 bytes, 1 commands in the stack
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decode AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decoded AsyncCommand [type=GET, output=ValueOutput [output=0.5475444299999999, error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true  
**AGIBTC 0.20769342999999998  0.5475444299999999**

我收到一些对的 Null,但交换提供了此信息。不明白是不是Redis的问题,还是我程序的逻辑有问题?

最佳答案

我发现您的代码有两个问题:

  1. 您使用 Redis 异步 API,但没有正确等待 set() 操作完成。 RedisAsyncCommands.set 返回 RedisFuture。在调用 get() 之前,您必须确保 future 已完成。
  2. 您等待 10 秒,等待来自 BinanceApi 的交易事件。但是有些货币对可能交易很少,所以 10 秒是不够的。

我通过添加适当的等待条件稍微修改了您的代码,一切似乎都运行良好:

//get Redis connection
    RedisClient client = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = client.connect();
    RedisAsyncCommands<String, String> redis = connection.async();
    String symbol = "AGIETH";
    Session session = null;
    CompletableFuture<String> cfBuy = new CompletableFuture<>();
    CompletableFuture<String> cfSell = new CompletableFuture<>();
    try {
        //Open websocket connection. 
        session = (new
            BinanceApi()).websocketTrades(BinanceSymbol.valueOf(symbol), new BinanceWebSocketAdapterAggTrades() {
            @Override
            public void onMessage(BinanceEventAggTrade message) {
                double closeOrderBuy = 0;
                double closeOrderSell = 0;
                //check if we  saved order information before and if yes get data from Redis
                try {
                    if(redis.get(symbol+"Buy").get()!=null )
                    {
                        closeOrderBuy = Double.valueOf(redis.get(symbol+"Buy").get());
                    }
                    if( redis.get(symbol+"Sell").get()!=null)
                    {
                        closeOrderSell = Double.valueOf(redis.get(symbol+"Sell").get());
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                // get current value from exchange
                double currentCloseOrder = message.getPrice().multiply(message.getQuantity()).doubleValue();

                // rewrite data in Redis
                if(message.isMaker()) {
                    closeOrderBuy = currentCloseOrder + closeOrderBuy;
                    redis.set(symbol + "Buy",String.valueOf(closeOrderBuy)).thenAccept(cfBuy::complete);
                }
                else {
                    closeOrderSell = currentCloseOrder + closeOrderSell;
                    redis.set(symbol + "Sell",String.valueOf(closeOrderSell)).thenAccept(cfSell::complete);
                }
            }
        });
    } catch (BinanceApiException e) {
        e.printStackTrace();
    }
    try {
        System.out.println("Futures completed: " + cfBuy.get() + " " + cfSell.get());
        session.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //check what we write
    try {
        System.out.println(symbol + redis.get(symbol + "Buy").get() + "  " + redis.get(symbol + "Sell").get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }    

输出:

Futures completed: OK OK
AGIETH0.06922798  0.08610368

关于java - Redis 在运行时获取命令不可预测的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50835623/

相关文章:

java - instanceof 返回 true 后怎么可能有 ClassCastException

redis sentinel 没有将 +sdown 升级为 +odown

thread-safety - redis对数据结构的操作是线程安全的吗

c# - StackExchange.Redis 发送结构

redis - 获取具有给定前缀的所有排序集的大小

Java Web应用程序 : Access Properties File To Update it

java - fxml 中定义的自定义控件 - 如何获取父 ID?

java - 我的资源未加载 - "Input stream must not be null"

java - ClassNotFoundException:com.mysql.jdbc.Driver

redis RESTORE 命令未按预期工作