Posted by Nick Johnson | Filed under app-engine, python, coding, deferred, tech, cursors Last week, I blogged about cursors, a new feature in version 1.3.1 of the App Engine SDK.
Today, I'm going to demonstrate a practical use for them: Bulk datastore updates.
Most of this method is taken up with exception handling. Other errors are assumed to be user errors, and are caught and logged.
If there's a finite threshold for the maximum number of user errors, the key of the failing entity is recorded, and we abort if we've reached the limit.
Subclasses should implement, at a minimum, get_query and handle_entity. PUT_BATCH_SIZE = 20 # Number of entities to delete() at once. """ raise Not Implemented Error() def finish(self, success, failed_keys): """Finish processing. Args: success: boolean: Indicates if the process completed successfully, or was aborted due to too many errors. This last one is necessary because tests have shown that the approach used in the deferred article, of catching the first deadline error and enqueueing the next task then, is not sufficiently reliable.
-1 indicates # no limit, in which case the list of failed keys will not be retained. As with previous mappers, we want to batch these operations for efficiency. Now we can implement the code that does the actual work. Deadline Exceeded Error): # Give up for now - reschedule for later.
allow an application to retrieve a query's results in convenient batches, and are recommended over using integer offsets for pagination.
See Queries for more information on structuring queries for your app.
\n\n" % self.__class__ subject = "Bulk update completed" else: message = "Bulk update job %s failed.\n\n" % self.__class__ subject = "Bulk update FAILED" message = ("Processed %d entities in %d tasks, putting %d and deleting %d\n\n" % (self.num_processed, self.num_tasks, self.num_put, self.num_deleted)) if failed_keys: message = "Processing failed for the following keys:\n" for key in failed_keys: message = "%r\n" % key mail.send_mail_to_admins(self.email_sender, subject, message) This mixin simply extends the finish() method, and if a sender address is provided, sends an email from it to all the app's admins, giving a brief report of the process's completion or failure.
Finally, we can define a couple of simple classes for commonly used types of update operation: class Bulk Put(Reporting Mixin, Bulk Updater): def __init__(self, query, email_sender=None): super(Bulk Put, self).__init__(email_sender) self.query = query def get_query(self): return self.query def handle_entity(self, entity): self.put(entity) class Bulk Delete(Reporting Mixin, Bulk Updater): def __init__(self, query, email_sender=None): super(Bulk Delete, self).__init__(email_sender) self.query = query def get_query(self): return self.query def handle_entity(self, entity): self.delete(entity) These two classes are almost identical, except for the operation carried out on each entity.