java - 从 Java 代码将 JSON 文件批量上传/导入到 Azure Cosmos DB

标签 java json azure azure-cosmosdb

我正在 JAVA 中生成 JSON 文件。该文件包含 JSON 列表。我想在创建此文件后立即将其导入到 Azure Cosmos DB

有什么方法可以通过Java代码实现它吗?

提前致谢!

最佳答案

根据我的研究,如果我们想用java实现批量操作,我们可以使用批量执行器Java库。更多详情请引用documentarticle 。关于如何使用批量执行器Java库,请引用document

例如

  1. 我的 .json 文件
[{
        "id": "1",
        "name": "test1",
        "age": "20"
    }, {
        "id": "2",
        "name": "test2",
        "age": "21"
    }, {
        "id": "3",
        "name": "test3",
        "age": "22"
    }, {
        "id": "4",
        "name": "test4",
        "age": "23"
    },
    {
        "id": "5",
        "name": "test5",
        "age": "24"
    }, {
        "id": "6",
        "name": "test6",
        "age": "25"
    }, {
        "id": "7",
        "name": "test7",
        "age": "26"
    }, {
        "id": "8",
        "name": "test8",
        "age": "27"
    }
]

  • 我的pom.xml
  • <dependency>
          <groupId>com.microsoft.azure</groupId>
          <artifactId>documentdb-bulkexecutor</artifactId>
          <version>2.6.0</version>
        </dependency>
    
        <dependency>
          <groupId>com.googlecode.json-simple</groupId>
          <artifactId>json-simple</artifactId>
          <version>1.1.1</version>
        </dependency>
    
  • 代码
  •  String endpoint="<your cosmos db endpoint>";
            String key="<your key>";
            ConnectionPolicy connectionPolicy = new ConnectionPolicy();
            connectionPolicy.setMaxPoolSize(1000);
            DocumentClient client = new DocumentClient(
                    endpoint,
                    key,
                    connectionPolicy,
                    ConsistencyLevel.Session);
            String databaseId="testbulk";
            String collectionId="items";
            String databaseLink = String.format("/dbs/%s", databaseId);
            String collectionLink = String.format("/dbs/%s/colls/%s", "testbulk", collectionId);
    
            ResourceResponse<Database> databaseResponse = null;
            Database readDatabase = null;
            try {
                databaseResponse = client.readDatabase(databaseLink, null);
                readDatabase = databaseResponse.getResource();
    
                System.out.println("Database already exists...");
    
            } catch (DocumentClientException dce) {
                if (dce.getStatusCode() == 404) {
                    System.out.println("Attempting to create database since non-existent...");
    
                    Database databaseDefinition = new Database();
                    databaseDefinition.setId(databaseId);
    
    
                        client.createDatabase(databaseDefinition, null);
    
    
                    databaseResponse = client.readDatabase(databaseLink, null);
                    readDatabase = databaseResponse.getResource();
                } else {
                    throw dce;
                }
            }
    
            ResourceResponse<DocumentCollection> collectionResponse = null;
            DocumentCollection readCollection = null;
    
            try {
                collectionResponse = client.readCollection(collectionLink, null);
                readCollection = collectionResponse.getResource();
    
                System.out.println("Collection already exists...");
            } catch (DocumentClientException dce) {
                if (dce.getStatusCode() == 404) {
                    System.out.println("Attempting to create collection since non-existent...");
    
                    DocumentCollection collectionDefinition = new DocumentCollection();
                    collectionDefinition.setId(collectionId);
    
                    PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition();
                    Collection<String> paths = new ArrayList<String>();
                    paths.add("/id");
                    partitionKeyDefinition.setPaths(paths);
                    collectionDefinition.setPartitionKey(partitionKeyDefinition);
    
                    RequestOptions options = new RequestOptions();
                    options.setOfferThroughput(1000000);
    
                    // create a collection
                    client.createCollection(databaseLink, collectionDefinition, options);
    
                    collectionResponse = client.readCollection(collectionLink, null);
                    readCollection = collectionResponse.getResource();
                } else {
                    throw dce;
                }
            }
    
            System.out.println(readCollection.getId());
            System.out.println(readDatabase.getId());
    
            ArrayList<String> list = new ArrayList<String>();
            JSONParser jsonParser = new JSONParser();
            try (FileReader reader = new FileReader("e:\\test.json")) {
    
                //Read JSON file
                Object obj = jsonParser.parse(reader);
    
                JSONArray jsonArray  = (JSONArray) obj;
                System.out.println(jsonArray);
                // cast jsonarry to string list
                if (jsonArray  != null) {
                    int len = jsonArray.size();
                    for (int i=0;i<len;i++){
                        list.add(jsonArray.get(i).toString());
                    }
                }
                System.out.println(list.get(0));
    
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ParseException e) {
                e.printStackTrace();
            }
            // Set client's retry options high for initialization
            client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
            client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
    
           // Builder pattern
            DocumentBulkExecutor.Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from(
                    client,
                    databaseId,
                    collectionId,
                    readCollection.getPartitionKey(),
                    20000) ;// throughput you want to allocate for bulk import out of the container's total throughput
    
             // Instantiate DocumentBulkExecutor
            try {
                DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build();
                // Set retries to 0 to pass complete control to bulk executor
                client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
                client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
                BulkImportResponse bulkImportResponse = bulkExecutor.importAll(list, false, false, null);
                System.out.println(bulkImportResponse.getNumberOfDocumentsImported());
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    

    enter image description here enter image description here

    关于java - 从 Java 代码将 JSON 文件批量上传/导入到 Azure Cosmos DB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59261927/

    相关文章:

    java - PDFDomTree 在将 pdf 文件转换为 html 时未检测到空格

    java - volatile 同步组合以提高性能

    用于特定电话号码格式的 Java 正则表达式

    css - 如何修复 "no such file or directory, lstat ' scss/'"?

    azure - 使用 az cli 将贡献者角色分配给应用程序

    java - 使用 PowerMockito 和 Mockito 时未完成的 stub 异常

    json - 使用接口(interface)将 JSON 字符串解码为数组

    javascript - 使用 Javascript 将 JSON 对象属性提取到新数组中

    c# - 如何从 Azure Active Directory 检索用户信息

    azure - Office365 API 身份验证 - Azure 与应用程序注册门户