kombu.transport.SQS
Amazon SQS transport module for Kombu. This package implements an AMQP-like
interface on top of Amazons SQS service, with the goal of being optimized for
high performance and reliability.
The default settings for this module are focused now on high performance in
task queue situations where tasks are small, idempotent and run very fast.
- SQS Features supported by this transport:
- Long Polling:
- http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/
- sqs-long-polling.html
Long polling is enabled by setting the wait_time_seconds transport
option to a number > 1. Amazon supports up to 20 seconds. This is
disabled for now, but will be enabled by default in the near future.
- Batch API Actions:
- http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/
sqs-batch-api.html
The default behavior of the SQS Channel.drain_events() method is to
request up to the ‘prefetch_count’ messages on every request to SQS.
These messages are stored locally in a deque object and passed back
to the Transport until the deque is empty, before triggering a new
API call to Amazon.
This behavior dramatically speeds up the rate that you can pull tasks
from SQS when you have short-running tasks (or a large number of workers).
When a Celery worker has multiple queues to monitor, it will pull down
up to ‘prefetch_count’ messages from queueA and work on them all before
moving on to queueB. If queueB is empty, it will wait up until
‘polling_interval’ expires before moving back and checking on queueA.
-
class kombu.transport.SQS.Transport(client, **kwargs)
-
class Channel(*args, **kwargs)
-
class Table(connection=None, name=None)
Amazon SimpleDB domain describing the message routing table.
-
create_binding(queue)
Get binding item for queue.
Creates the item if it doesn’t exist.
-
exchange_delete(exchange)
Delete all routes for exchange.
-
get_exchanges()
-
get_item(item_name)
Uses consistent_read by default.
-
get_queue(queue)
Get binding for queue.
-
queue_bind(exchange, routing_key, pattern, queue)
-
queue_delete(queue)
delete queue by name.
-
routes_for(exchange)
Iterator giving all routes for an exchange.
-
select(query='', next_token=None, consistent_read=True, max_items=None)
Uses consistent_read by default.
-
Transport.Channel.basic_ack(delivery_tag)
-
Transport.Channel.basic_cancel(consumer_tag)
-
Transport.Channel.basic_consume(queue, no_ack, *args, **kwargs)
-
Transport.Channel.close()
-
Transport.Channel.conninfo
-
Transport.Channel.default_region = 'us-east-1'
-
Transport.Channel.default_visibility_timeout = 1800
-
Transport.Channel.default_wait_time_seconds = 0
-
Transport.Channel.domain_format = 'kombu%(vhost)s'
-
Transport.Channel.drain_events(timeout=None)
Return a single payload message from one of our queues.
Raises Empty: | if no messages available. |
-
Transport.Channel.entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})
Format AMQP queue name into a legal SQS queue name.
-
Transport.Channel.exchange_delete(exchange, **kwargs)
Delete exchange by name.
-
Transport.Channel.get_exchanges()
-
Transport.Channel.get_table(exchange)
Get routing table.
Retrieved from SDB if supports_fanout.
-
Transport.Channel.queue_bind(queue, exchange=None, routing_key='', arguments=None, **kwargs)
-
Transport.Channel.queue_name_prefix
-
Transport.Channel.region
-
Transport.Channel.sdb
-
Transport.Channel.sqs
-
Transport.Channel.supports_fanout
-
Transport.Channel.table
-
Transport.Channel.transport_options
-
Transport.Channel.visibility_timeout
-
Transport.Channel.wait_time_seconds
-
Transport.channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'boto.exception.SQSDecodeError'>)
-
Transport.connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'boto.exception.SQSError'>, <class 'socket.error'>)
-
Transport.default_port = None
-
Transport.driver_name = 'sqs'
-
Transport.driver_type = 'sqs'
-
Transport.polling_interval = 1
-
Transport.wait_time_seconds = 0
-
class kombu.transport.SQS.Channel(*args, **kwargs)
-
class Table(connection=None, name=None)
Amazon SimpleDB domain describing the message routing table.
-
create_binding(queue)
Get binding item for queue.
Creates the item if it doesn’t exist.
-
exchange_delete(exchange)
Delete all routes for exchange.
-
get_exchanges()
-
get_item(item_name)
Uses consistent_read by default.
-
get_queue(queue)
Get binding for queue.
-
queue_bind(exchange, routing_key, pattern, queue)
-
queue_delete(queue)
delete queue by name.
-
routes_for(exchange)
Iterator giving all routes for an exchange.
-
select(query='', next_token=None, consistent_read=True, max_items=None)
Uses consistent_read by default.
-
Channel.basic_ack(delivery_tag)
-
Channel.basic_cancel(consumer_tag)
-
Channel.basic_consume(queue, no_ack, *args, **kwargs)
-
Channel.close()
-
Channel.conninfo
-
Channel.default_region = 'us-east-1'
-
Channel.default_visibility_timeout = 1800
-
Channel.default_wait_time_seconds = 0
-
Channel.domain_format = 'kombu%(vhost)s'
-
Channel.drain_events(timeout=None)
Return a single payload message from one of our queues.
Raises Empty: | if no messages available. |
-
Channel.entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})
Format AMQP queue name into a legal SQS queue name.
-
Channel.exchange_delete(exchange, **kwargs)
Delete exchange by name.
-
Channel.get_exchanges()
-
Channel.get_table(exchange)
Get routing table.
Retrieved from SDB if supports_fanout.
-
Channel.queue_bind(queue, exchange=None, routing_key='', arguments=None, **kwargs)
-
Channel.queue_name_prefix
-
Channel.region
-
Channel.sdb
-
Channel.sqs
-
Channel.supports_fanout
-
Channel.table
-
Channel.transport_options
-
Channel.visibility_timeout
-
Channel.wait_time_seconds