java - 如何在 Spring Boot Controller 之外正确 @Autowire 数据访问对象?

标签 java spring hibernate spring-boot

我正在开发一个 Spring Boot 应用程序,它充当 REST API,接收 JSON 数据,并将其保存在 Postgres DB 中。 为此,我有 Controller 以及单独的数据访问类。 在我的 Controller 中,我正在使用

 @Autowired
private CusttableDao custtableDao;

自动连接我的 DAO 类,如下所示:

@Repository
@Transactional
public class CusttableDao {

    static final Logger LOG = LoggerFactory.getLogger(CusttableDao.class);

    //EntityManagerFactory emfactory = Persistence.createEntityManagerFactory( "Eclipselink_JPA" );

  public void update(Custtable custtable) {
    entityManager.merge(custtable);
    return;
  }

  public Custtable getById(Class<Custtable> class1, CusttableCompositeKey custtableCompositeKey) {
        Custtable ct = entityManager.find(Custtable.class, custtableCompositeKey);
        return entityManager.find(Custtable.class, custtableCompositeKey);
    }

  public void create(Custtable custtable) {
        entityManager.persist(custtable);
        return;
      }

  public void delete(Custtable custtable) {
        if (entityManager.contains(custtable))
          entityManager.remove(custtable);
        else
          entityManager.remove(entityManager.merge(custtable));
        return;
      }

  @PersistenceContext
  private EntityManager entityManager;
}

就 REST API 服务而言,这种安排效果很好。

最近,我被要求添加从 Kafka 接收数据的功能。

为此,我构建了一个 ThreadPool 和一个 Consumer 类:

@Component
public class ConsumerThreadPool {

    private static final String TOPIC = "test5";
    private static final Integer NUM_THREADS = 1;

    @Autowired
    private ConsumerConfigFactory consumerConfigFactory;

    private ConsumerConnector consumer;
    private ExecutorService threadPool;

    public ConsumerThreadPool() {
        threadPool = Executors.newFixedThreadPool(NUM_THREADS);
    }

    @PostConstruct
    public void startConsuming() {
        ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
        consumer = createJavaConsumerConnector(consumerConfig);

        consume();
    }

    public void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, NUM_THREADS);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);

        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            threadPool.submit(new ErpConsumer(stream, threadNumber));
            threadNumber++;
        }
    }


public class ErpConsumer implements Runnable {

    @Autowired
    private CusttableDao custtableDao;

    private ObjectMapper objectMapper;
    private KafkaStream<byte[], byte[]> kafkaStream;
    private int threadNumber;

    public ErpConsumer(KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
        this.objectMapper = new ObjectMapper();
    }

    ObjectMapper mapper = new ObjectMapper();

    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();

        while (it.hasNext()) {
            byte[] messageData = it.next().message();
            try {
                String msg = new String(messageData);

                JSONArray jsonArray = new JSONArray(msg);

                for (int i = 0; i < jsonArray.length(); i++) {                  
                    JSONObject custtableObject = jsonArray.getJSONObject(i);                  
                      dispatchRecord(custtableObject);
                        }  
                System.out.print(msg + "\n");
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }

        System.out.println("Shutting down Thread: " + kafkaStream);
    }

    private void dispatchRecord(JSONObject record) throws JSONException, JsonParseException, JsonMappingException, IOException{
        String changeTableName = record.getString("upk_changedtablename");
        record.remove("upk_Accountnum");

        switch (changeTableName) {
            case "Custtable":  
                Custtable custTable = mapper.readValue(record.toString(), new TypeReference<Custtable>(){});

                custTable.setPartition( Long.valueOf(record.getString("upk_partition")).longValue());

                Long  keyfieldrecid = custTable.getUpk_keyfieldrecid();
                Long partition = custTable.getUpk_partition();

                if(custTable.getOptype().equals("U")) {

                    Custtable customer = (Custtable) custtableDao.getById(Custtable.class, 
                            new CusttableCompositeKey 
                            (custTable.getUpk_keyfieldrecid(),custTable.getUpk_partition())); 

                BeanUtils.copyProperties(custTable, customer);

                customer.setCusttableCompositeKey(new CusttableCompositeKey 
                        (keyfieldrecid,partition));

                custtableDao.update(customer);
                   }

            default: 
                     break;
            }   
    }

与 Rest Controller 不同,使用

@Autowired
    private CusttableDao custtableDao;

没有帮助 - custtableDao 在这里保持为空:

Custtable customer = (Custtable) custtableDao.getById(Custtable.class, 
                            new CusttableCompositeKey 
                            (custTable.getUpk_keyfieldrecid(),custTable.getUpk_partition()));

我在消费者中自动写入/实例化/访问 DAO(不是 RestController 类型)的正确方法是什么?

最佳答案

您的 ErpConsumer 类不是托管 bean,您在这一行中自己实例化它:

threadPool.submit(new ErpConsumer(stream, threadNumber));

@Autowired 仅适用于由 Spring 管理的 bean。

而是将您的 dao 注入(inject)到您的 ConsumerThreadPool 中,它是一个托管 bean(因为 @Component 注释)。然后将 dao 作为参数添加到 ErpConsumer 的构造函数中,并按如下方式创建它的实例:

threadPool.submit(new ErpConsumer(custtableDao,stream, threadNumber));

您的构造函数可能如下所示:

public ErpConsumer(CusttableDao custtableDao,KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
        this.objectMapper = new ObjectMapper();
        this.custtableDao = custtableDao;
    }

关于java - 如何在 Spring Boot Controller 之外正确 @Autowire 数据访问对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39735161/

相关文章:

java - “变量”不能解析为变量

spring - 客户端密码+在spring oauth2中刷新访问 token

spring - @EnableTransactionManager java 配置

java - hibernate,查询可嵌入对象

java - Hibernate Envers - 请求更改对象列表

java - Spring Boot 不会从 application.yml 中选取值

java - 局部变量类型推断

java - 通配符匹配严格,但servlet.xml中多个元素找不到声明

java - HQL 和内部类(例如构建器)

java - 如何在 dropwizard 上使用 guice 自动连接 HibernateBundle?