Source code for pyelasticsearch.utils

[docs]def bulk_chunks(actions, docs_per_chunk=300, bytes_per_chunk=None): """ Return groups of bulk-indexing operations to send to :meth:`~pyelasticsearch.ElasticSearch.bulk()`. Return an iterable of chunks, each of which is a JSON-encoded line or pair of lines in the format understood by ES's bulk API. :arg actions: An iterable of bulk actions, JSON-encoded. The best idea is to pass me a list of the outputs from :meth:`~pyelasticsearch.ElasticSearch.index_op()`, :meth:`~pyelasticsearch.ElasticSearch.delete_op()`, and :meth:`~pyelasticsearch.ElasticSearch.update_op()`. :arg docs_per_chunk: The number of documents (or, more technically, actions) to put in each chunk. Set to None to use only ``bytes_per_chunk``. :arg bytes_per_chunk: The maximum number of bytes of HTTP body payload to put in each chunk. Leave at None to use only ``docs_per_chunk``. This option helps prevent timeouts when you have occasional very large documents. Without it, you may get unlucky: several large docs might land in one chunk, and ES might time out. Chunks are capped by ``docs_per_chunk`` or ``bytes_per_chunk``, whichever is reached first. Obviously, we cannot make a chunk to smaller than its smallest doc, but we do the best we can. If both ``docs_per_chunk`` and ``bytes_per_chunk`` are None, all docs end up in one big chunk (and you might as well not use this at all). """ chunk = [] docs = bytes = 0 for action in actions: next_len = len(action) + 1 # +1 for \n if chunk and ((docs_per_chunk and docs >= docs_per_chunk) or (bytes_per_chunk and bytes + next_len > bytes_per_chunk)): # If adding this action will cause us to go over a limit, spit out # the current chunk: yield chunk chunk = [] docs = bytes = 0 chunk.append(action) docs += 1 bytes += next_len if chunk: # Yield leftovers at the end. yield chunk