问题描述
我正在尝试使用this CosmosDB sample实现批量插入。此示例是使用.NET Core 3. *创建的,并支持System.Text.Json。
使用CreateItemAsync方法时,它可以完美运行:
var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
foreach (var entity in entities)
{
entity.Id = GenerateId(entity);
var requestOptions = new ItemRequestOptions();
requestOptions.EnableContentResponSEOnWrite = false; // We don't need to get the entire body returend.
concurrentTasks.Add(Container.CreateItemAsync(entity,new PartitionKey(entity.UserId),requestOptions));
}
await Task.WhenAll(concurrentTasks);
但是,我试图看看是否可以通过直接将数据流传输到CosmosDB中来减少RU的数量,希望CosmosDB不会因为反序列化JSON本身而向我收费。
我正在.NET Core 2.1和Newtonsoft.Json中工作。这是我的代码,不返回成功的状态代码。响应标头中的子状态代码为“ 0”。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new Dictionary<PartitionKey,Stream>();
foreach (var notification in notifications)
{
MemoryStream ms = new MemoryStream();
StreamWriter writer = new StreamWriter(ms);
JsonTextWriter jsonWriter = new JsonTextWriter(writer);
JsonSerializer ser = new JsonSerializer();
ser.Serialize(jsonWriter,notification);
await jsonWriter.FlushAsync();
await writer.FlushAsync();
itemsToInsert.Add(new PartitionKey(notification.UserId),ms);
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (keyvaluePair<PartitionKey,Stream> item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Value,item.Key)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessstatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
response.StatusCode:BadRequest response.ErrorMessage:空
我假设我没有以正确的方式序列化到Stream中。有人知道了吗?
更新
我发现新的System.Text.Json包也实现了.NET Standard 2.0,因此我从NUget安装了它。现在,我可以从前面提到的Github复制示例代码。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new List<Tuple<PartitionKey,Stream>>();
foreach (var notification in notifications)
{
notification.id = $"{notification.UserId}:{Guid.NewGuid()}";
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream,notification);
itemsToInsert.Add(new Tuple<PartitionKey,Stream>(new PartitionKey(notification.RoleId),stream));
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (var item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Item2,item.Item1)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessstatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
我仔细检查了BulkInsert是否已启用(否则第一种方法也将不起作用)。仍然有BadRequest和errorMessage为NULL。
我还检查了BadRequest为何没有将数据添加到容器中。
解决方法
我发现了问题。
我已使用以下选项设置了Cosmos上下文:
var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;
采用 CamelCase
约定。在我的第一个(有效的)代码示例中,我将CosmosDB Context反序列化为JSON。他将使用CamelCase约定进行序列化,因此我的PartionKey UserId
将被序列化为userId
。
但是,为了减少一些RU,我将使用CreateItemStreamAsync
来使我负责序列化。而且有一个错误,我的属性定义如下:
public int UserId { get; set; }
因此他将被序列化为json UserId: 1
。
但是,分区键被定义为/userId
。因此,如果我添加JsonPropertyName属性,它将起作用:
[JsonPropertyName("userId")]
public int UserId { get; set; }
...如果只有错误消息会告诉我。
使用此CreateItemStream
方法可以节省大约3%的RU。但是,随着时间的流逝,我估计这会慢慢节省一些RU。
似乎流不可读。因此提出了不好的要求。
我将很少修改MemoryStream
的创建方式:
foreach (var notification in notifications)
{
itemsToInsert.Add(new PartitionKey(notification.UserId),new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
}
当然,我将Newtonsoft.json用于jsonConvert。