如何使用.NET Core 2.1和Stream API在Cosmos DB中批量插入

问题描述

我正在尝试使用this CosmosDB sample实现批量插入。此示例是使用.NET Core 3. *创建的,并支持System.Text.Json。

使用CreateItemAsync方法时,它可以完美运行:

    var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
    foreach (var entity in entities)
    {
        entity.Id = GenerateId(entity);

        var requestOptions = new ItemRequestOptions();
        requestOptions.EnableContentResponSEOnWrite = false; // We don't need to get the entire body returend.
        concurrentTasks.Add(Container.CreateItemAsync(entity,new PartitionKey(entity.UserId),requestOptions));
    }

    await Task.WhenAll(concurrentTasks);

但是,我试图看看是否可以通过直接将数据流传输到CosmosDB中来减少RU的数量,希望CosmosDB不会因为反序列化JSON本身而向我收费。

我正在.NET Core 2.1和Newtonsoft.Json中工作。这是我的代码,不返回成功的状态代码。响应标头中的子状态代码为“ 0”。

    Notification[] notifications = entities.ToArray();
    var itemsToInsert = new Dictionary<PartitionKey,Stream>();

    foreach (var notification in notifications)
    {
        MemoryStream ms = new MemoryStream();
        StreamWriter writer = new StreamWriter(ms);
        JsonTextWriter jsonWriter = new JsonTextWriter(writer);
        JsonSerializer ser = new JsonSerializer();
                
        ser.Serialize(jsonWriter,notification);

        await jsonWriter.FlushAsync();
        await writer.FlushAsync();

        itemsToInsert.Add(new PartitionKey(notification.UserId),ms);
    }

    List<Task> tasks = new List<Task>(notifications.Length);
    foreach (keyvaluePair<PartitionKey,Stream> item in itemsToInsert)
    {
        tasks.Add(Container.CreateItemStreamAsync(item.Value,item.Key)
            .ContinueWith((Task<ResponseMessage> task) =>
            {
                using (ResponseMessage response = task.Result)
                {
                    if (!response.IsSuccessstatusCode)
                    {
                        Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                    }
                    else
                    {
                    }
                }
            }));
    }

    // Wait until all are done
    await Task.WhenAll(tasks);

response.StatusCode:BadRequest response.ErrorMessage:空

我假设我没有以正确的方式序列化到Stream中。有人知道了吗?

更新

我发现新的System.Text.Json包也实现了.NET Standard 2.0,因此我从NUget安装了它。现在,我可以从前面提到的Github复制示例代码

        Notification[] notifications = entities.ToArray();
        var itemsToInsert = new List<Tuple<PartitionKey,Stream>>();

        foreach (var notification in notifications)
        {
            notification.id = $"{notification.UserId}:{Guid.NewGuid()}";

            MemoryStream stream = new MemoryStream();
            await JsonSerializer.SerializeAsync(stream,notification);

            itemsToInsert.Add(new Tuple<PartitionKey,Stream>(new PartitionKey(notification.RoleId),stream));
        }

        List<Task> tasks = new List<Task>(notifications.Length);
        foreach (var item in itemsToInsert)
        {
            tasks.Add(Container.CreateItemStreamAsync(item.Item2,item.Item1)
                .ContinueWith((Task<ResponseMessage> task) =>
                {
                    using (ResponseMessage response = task.Result)
                    {
                        if (!response.IsSuccessstatusCode)
                        {
                            Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                        }
                        else
                        {
                        }
                    }
                }));
        }

        // Wait until all are done
        await Task.WhenAll(tasks);

我仔细检查了BulkInsert是否已启用(否则第一种方法也将不起作用)。仍然有BadRequest和errorMessage为NULL。

我还检查了BadRequest为何没有将数据添加到容器中。

解决方法

我发现了问题。

我已使用以下选项设置了Cosmos上下文:

var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;

CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;

采用 CamelCase 约定。在我的第一个(有效的)代码示例中,我将CosmosDB Context反序列化为JSON。他将使用CamelCase约定进行序列化,因此我的PartionKey UserId将被序列化为userId

但是,为了减少一些RU,我将使用CreateItemStreamAsync来使我负责序列化。而且有一个错误,我的属性定义如下:

public int UserId { get; set; }

因此他将被序列化为json UserId: 1

但是,分区键被定义为/userId。因此,如果我添加JsonPropertyName属性,它将起作用:

[JsonPropertyName("userId")]
public int UserId { get; set; } 

...如果只有错误消息会告诉我。

使用此CreateItemStream方法可以节省大约3%的RU。但是,随着时间的流逝,我估计这会慢慢节省一些RU。

,

似乎流不可读。因此提出了不好的要求。 我将很少修改MemoryStream的创建方式:

foreach (var notification in notifications)
    {
        
        itemsToInsert.Add(new PartitionKey(notification.UserId),new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
    }

当然,我将Newtonsoft.json用于jsonConvert。