我正在开发一个 Spring Boot 应用程序,它充当 REST API,接收 JSON 数据,并将其保存在 Postgres DB 中。 为此,我有 Controller 以及单独的数据访问类。 在我的 Controller 中,我正在使用
@Autowired
private CusttableDao custtableDao;
自动连接我的 DAO 类,如下所示:
@Repository
@Transactional
public class CusttableDao {
static final Logger LOG = LoggerFactory.getLogger(CusttableDao.class);
//EntityManagerFactory emfactory = Persistence.createEntityManagerFactory( "Eclipselink_JPA" );
public void update(Custtable custtable) {
entityManager.merge(custtable);
return;
}
public Custtable getById(Class<Custtable> class1, CusttableCompositeKey custtableCompositeKey) {
Custtable ct = entityManager.find(Custtable.class, custtableCompositeKey);
return entityManager.find(Custtable.class, custtableCompositeKey);
}
public void create(Custtable custtable) {
entityManager.persist(custtable);
return;
}
public void delete(Custtable custtable) {
if (entityManager.contains(custtable))
entityManager.remove(custtable);
else
entityManager.remove(entityManager.merge(custtable));
return;
}
@PersistenceContext
private EntityManager entityManager;
}
就 REST API 服务而言,这种安排效果很好。
最近,我被要求添加从 Kafka 接收数据的功能。
为此,我构建了一个 ThreadPool 和一个 Consumer 类:
@Component
public class ConsumerThreadPool {
private static final String TOPIC = "test5";
private static final Integer NUM_THREADS = 1;
@Autowired
private ConsumerConfigFactory consumerConfigFactory;
private ConsumerConnector consumer;
private ExecutorService threadPool;
public ConsumerThreadPool() {
threadPool = Executors.newFixedThreadPool(NUM_THREADS);
}
@PostConstruct
public void startConsuming() {
ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
consumer = createJavaConsumerConnector(consumerConfig);
consume();
}
public void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, NUM_THREADS);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
threadPool.submit(new ErpConsumer(stream, threadNumber));
threadNumber++;
}
}
public class ErpConsumer implements Runnable {
@Autowired
private CusttableDao custtableDao;
private ObjectMapper objectMapper;
private KafkaStream<byte[], byte[]> kafkaStream;
private int threadNumber;
public ErpConsumer(KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
this.threadNumber = threadNumber;
this.kafkaStream = kafkaStream;
this.objectMapper = new ObjectMapper();
}
ObjectMapper mapper = new ObjectMapper();
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
byte[] messageData = it.next().message();
try {
String msg = new String(messageData);
JSONArray jsonArray = new JSONArray(msg);
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject custtableObject = jsonArray.getJSONObject(i);
dispatchRecord(custtableObject);
}
System.out.print(msg + "\n");
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Shutting down Thread: " + kafkaStream);
}
private void dispatchRecord(JSONObject record) throws JSONException, JsonParseException, JsonMappingException, IOException{
String changeTableName = record.getString("upk_changedtablename");
record.remove("upk_Accountnum");
switch (changeTableName) {
case "Custtable":
Custtable custTable = mapper.readValue(record.toString(), new TypeReference<Custtable>(){});
custTable.setPartition( Long.valueOf(record.getString("upk_partition")).longValue());
Long keyfieldrecid = custTable.getUpk_keyfieldrecid();
Long partition = custTable.getUpk_partition();
if(custTable.getOptype().equals("U")) {
Custtable customer = (Custtable) custtableDao.getById(Custtable.class,
new CusttableCompositeKey
(custTable.getUpk_keyfieldrecid(),custTable.getUpk_partition()));
BeanUtils.copyProperties(custTable, customer);
customer.setCusttableCompositeKey(new CusttableCompositeKey
(keyfieldrecid,partition));
custtableDao.update(customer);
}
default:
break;
}
}
与 Rest Controller 不同,使用
@Autowired
private CusttableDao custtableDao;
没有帮助 - custtableDao 在这里保持为空:
Custtable customer = (Custtable) custtableDao.getById(Custtable.class,
new CusttableCompositeKey
(custTable.getUpk_keyfieldrecid(),custTable.getUpk_partition()));
我在消费者中自动写入/实例化/访问 DAO(不是 RestController 类型)的正确方法是什么?
最佳答案
您的 ErpConsumer
类不是托管 bean,您在这一行中自己实例化它:
threadPool.submit(new ErpConsumer(stream, threadNumber));
@Autowired
仅适用于由 Spring 管理的 bean。
而是将您的 dao 注入(inject)到您的 ConsumerThreadPool
中,它是一个托管 bean(因为 @Component
注释)。然后将 dao 作为参数添加到 ErpConsumer
的构造函数中,并按如下方式创建它的实例:
threadPool.submit(new ErpConsumer(custtableDao,stream, threadNumber));
您的构造函数可能如下所示:
public ErpConsumer(CusttableDao custtableDao,KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
this.threadNumber = threadNumber;
this.kafkaStream = kafkaStream;
this.objectMapper = new ObjectMapper();
this.custtableDao = custtableDao;
}
关于java - 如何在 Spring Boot Controller 之外正确 @Autowire 数据访问对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39735161/