Easy scaling with Cosmos DB for large loads
Working with a customer we need to load a batch of 40-50K documents to Cosmos DB. We could let this pass, but it will result in quite a few HTTP 429 RequestRateTooLarge responses. The operation will be retried, but it will also hurt other clients accessing the same data. Potentially causing a cascade of 429's and possibly degraded performance for other clients, with real humans waiting.
To prevent this we can scale up to the required request units. This is easily done through code
offer = _documentClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
await _documentClient.ReplaceOfferAsync(new OfferV2(offer, HighThroughputRequestUnits));
After processing our batch we can scale down again using the same technique.
Provided we have a IBatchJob interface, we can then use the decorator pattern to wrap our batch job with a HighThroughputBatchJobDecorator, which will scale up Cosmos DB, execute the original BatchJob and scale it down again.
public class HighThroughputBatchJobDecorator : IBatchJob
{
private const int HighThroughputRequestUnits = 10000;
private IBatchJob _batchJob;
private IDocumentClient _documentClient;
private IDocumentDBConfig _config;
public HighThroughputBatchJobDecorator(IBatchJob batchJob, IDocumentClient documentClient, IDocumentDBConfig config)
{
_batchJob = batchJob;
_documentClient = documentClient;
_config = config;
}
public async Task RunAsync(Uri feedUri, XElement feedXml)
{
Offer offer = null;
try
{
DocumentCollection collection = _documentClient.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(_config.DatabaseName))
.Where(c => c.Id == _config.CollectionId).ToArray().Single();
offer = _documentClient.CreateOfferQuery().Where(o => o.ResourceLink == collection.SelfLink).AsEnumerable().Single();
await _documentClient.ReplaceOfferAsync(new OfferV2(offer, HighThroughputRequestUnits));
Trace.WriteLine("Running with high througput");
await _batchJob.RunAsync(feedUri, feedXml);
}
finally
{
if (offer != null)
{
// Scale down
await _documentClient.ReplaceOfferAsync(offer);
Trace.WriteLine("Collection scaled down from high throughput");
}
}
}
}
Pretty neat ;)