java - 以多种方式发送数据,具体取决于您希望如何发送

标签 java oop design-patterns bytebuffer single-responsibility-principle

我有一堆键和值,我想通过将它们打包到一个字节数组中来发送到我们的消息队列。我会将所有键和值组成一个字节数组,这些键和值应始终小于 50K,然后发送到我们的消息队列。

数据包类:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

下面是我发送数据的方式。截至目前,我的设计只允许通过调用上述 sendData() 方法中的 sendToQueueAsync 方法异步发送数据。

  private void validateAndSend(final RecordPartition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    final Packet packet = new Packet(partition);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
      packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
          dataHolder.getProcessBytes());
    }
    packet.close();
  }

现在我需要扩展我的设计,以便我可以用三种不同的方式发送数据。由用户决定他想要发送数据的方式,“同步”或“异步”。

  • 我需要通过调用 sender.sendToQueueAsync 方法异步发送数据。
  • 或者我需要通过调用 sender.sendToQueueSync 方法同步发送数据。
  • 或者我需要通过调用 sender.sendToQueueSync 方法在特定套接字上同步发送数据。在这种情况下,我需要以某种方式传递 socket 变量,以便 sendData 知道这个变量。

发送记录类:

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message);
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // called by multiple threads concurrently
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  // called by above method and also by handleRetry method
  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  // called by send method below
  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  // called by multiple threads to send data synchronously without passing socket
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by a threads to send data synchronously but with socket as the parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  public void handleAckReceived(final long address) {
    PendingMessage record = cache.getIfPresent(address);
    if (record != null) {
      record.ackReceived();
      cache.invalidate(address);
    }
  }
}

调用者只会调用以下三种方法之一:

  • sendToQueueAsync 通过传递两个参数
  • 通过传递两个参数sendToQueueSync
  • 通过传递三个参数sendToQueueSync

我应该如何设计我的 PacketSendRecord 类,以便我可以告诉 Packet 类这个数据需要在任何一个中发送以上三种方式到我的消息队列。由用户决定他想以哪种方式将数据发送到消息队列。截至目前,我的 Packet 类的结构方式,它只能以一种方式发送数据。

最佳答案

我认为您最好的选择是策略模式 (https://en.wikipedia.org/wiki/Strategy_pattern)。

使用此模式,您可以封装每种类型的“发送”的行为,例如,AsynchronousSend 类、SynchronousSend 类和AsynchronousSocketSend 类。 (您可能会想出更好的名字)。然后,Packet 类可以根据某些逻辑决定使用哪个类将数据发送到队列。

关于java - 以多种方式发送数据,具体取决于您希望如何发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48201935/

相关文章:

python - 在 Python 中首先将对象属性初始化为类属性

artificial-intelligence - 生成段落模式的AI程序

java - 尝试在每次点击时显示颜色名称检查我的代码

java - 在Java中使用Slider,如何检测 slider 是向右还是向左变化?

python - QTabWidget 不允许我使用自定义类

c++ - 使用 cout 后对象数组被破坏

在游戏实体类中重用代码的 Pythonic 方式

design-patterns - Scala 中针对经常遇到的问题建立了哪些常见模式/解决方案

java - Tomcat - 多个 webapps 文件夹

java - 将元素添加到 List<?扩展父类(super class)>,需要澄清