我有一个监听端口的外部 TCP 服务器,一旦客户端成功建立连接,它将开始推送数据(可以将其视为典型的发布-订阅模型)。只有初始请求将从应用程序向服务器创建套接字连接,不会发送其他请求。服务器在有数据时推送数据。 这里的问题是,我使用 TCP 出站端点与服务器建立了连接,但是我如何连续监听我的出站创建的套接字以接收服务器发布的数据?
最佳答案
将客户端套接字连接共享给监听器的一种方法是在 TCP 出站连接器中使用自定义 MessageDispatcher,例如 -
<tcp:connector name="TCP2" doc:name="TCP connector"
clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
validateConnections="true" keepAlive="true" sendTcpNoDelay="true"
keepSendSocketOpen="true">
<receiver-threading-profile
maxThreadsActive="1" maxThreadsIdle="1" />
<reconnect-forever />
<service-overrides dispatcherFactory="CustomMessageDispatcherFactory"/>
</tcp:connector>
你会有一个像这样的调度器工厂类
import org.mule.api.MuleException;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.transport.MessageDispatcher;
import org.mule.transport.tcp.TcpMessageDispatcherFactory;
public class CustomMessageDispatcherFactory extends TcpMessageDispatcherFactory {
public MessageDispatcher create(OutboundEndpoint endpoint) throws MuleException
{
return new CustomMessageDispatcher(endpoint);
}
}
还有一个如下所示的 CustomMessageDispatcher.class -
import org.mule.api.MuleEvent;
import org.mule.api.MuleMessage;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.transformer.TransformerException;
import org.mule.transport.AbstractMessageDispatcher;
import org.mule.transport.NullPayload;
import org.mule.transport.tcp.TcpConnector;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
/**
* Send transformed Mule events over TCP.
*/
public class CustomMessageDispatcher extends AbstractMessageDispatcher
{
private final TcpConnector connector;
public CustomMessageDispatcher (OutboundEndpoint endpoint)
{
super(endpoint);
this.connector = (TcpConnector) endpoint.getConnector();
}
@Override
protected synchronized void doDispatch(MuleEvent event) throws Exception
{
/* Share the socket with the mule flow as a session variable */
Socket socket = connector.getSocket(endpoint);
event.getMessage().setInvocationProperty("ClientSocket", socket);
/* If you have something to be dispatched, you can use the below section of code */
try
{
dispatchToSocket(socket, event);
}
finally
{
connector.releaseSocket(socket, endpoint);
}
}
@Override
protected MuleMessage doSend(MuleEvent event) throws Exception {
// Not used since we do not do request-response for the outbound endpoint
return null;
}
}
如果你想监听同一个套接字,你也可以使用相同的类来监听。您可以为 doSend() 方法编写自己的实现。当端点设置为“请求-响应”并且它接收字节数组并在将其分配为有效负载后将其发送回 mule 流时,将触发此方法。请引用类 org.mule.transport.tcp.TcpMessageDispatcher.class 以了解这些方法的默认用法。 仅供引用 - 我自己没有测试过上述逻辑,但我有一个类似的实现来跨 mule 流发送套接字对象。希望对您有所帮助。
关于sockets - MULE- 如何监听由出站端点创建的客户端套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41360292/