AWS – SQS

 

SQS is a messaging system similar to ActiveMQ or RabbitMQ. It is a temporary repository for messages that need to be processed but the server is busy processing other messages. SQS is fully managed by AWS. You don’t have to manage middleware or servers. Using SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.

SQS is pull-based and not push-based. Messages can live in the queue from 1 minute to 14 days. The default retention period is 4 days.

Visibility timeout is the amount of time that a message is invisible while processing. If a message is not processed between the visibility timeout then it will become available again for another reader to pick up and process. This can result in one message being processed more than once. Default visibility timeout is 30 seconds. Maximum visibility timeout is 12 hours. Increase your visibility timeout according to your needs.

Messages can contain up to 256KB of text in any format. SQS solves the issue where messages or events are more than what the server can process. It will store the messages and hand them over to the server when the server is ready for them.

There are two queue types which are Standard Queues and FIFO Queues.

Standard Queue

Standard queues guarantee that a message is delivered once but occasionally a message might get delivered twice. Messages within standard queues are generally delivered in the order they are sent.

You can set delay seconds which tells SQS to wait for a certain amount of seconds before exposing the message to the readers.

Standard queues can serve unlimited number of messages.

FIFO Queue

FIFO which stands for first-in-first-out delivers and processes messages in the order they are sent. There are no duplicates for this type of queue.

FIFO queues support up to 3,000 messages per second with batching , or up to 300 messages per second (300 send, receive, or delete operations per second) without batching. You can submit a support ticket to AWS to increase this limit.

MessageGroupId groups messages that belong together and is processed in a FIFO manner. Other messages that don’t have the same messageGroupId might not be in order.

MessageDeduplicationId is used to make sure messages are unique and that messages are not sent into the queue multiple times. If a message with a particular MessageDeduplicationId is sent successfully, any messages sent with the same MessageDeduplicationId are accepted successfully but aren’t delivered during the 5-minute deduplication interval.

DelaySeconds attribute can only be set on the queue level for FIFO.

Long Polling

Long polling doesn’t return a response unless there is a message to process or the long polling timeouts. Long polling helps you save money as your API might make less frequent calls to the AWS SQS API. The recommended timeout is 20 seconds but can be more. The longer the timeout the less likely your API call will return an empty response which more like will result in saving resources.

Using SQS Locally

At the time of writing, I have not found a way that works consistently to run SQS on my local machine. I have tried many approaches people put on the internet but none worked for me. For local development, I use a set of queues different from the ones I use for other environments like Dev, QA, or production. This has been the best solution for me. If you find a solution that works for you please leave a link in the comment section.

Spring project example for sending an email after a user has signed up on your application.

	@Bean
	public AWSCredentialsProvider amazonAWSCredentialsProvider() {
		return new ProfileCredentialsProvider("folauk100-dev");
	}
	
	@Bean
	public AmazonSQS amazonSQSMain() {
		AmazonSQS sqs = AmazonSQSClientBuilder.standard().withCredentials(amazonAWSCredentialsProvider())
				.withRegion(Regions.US_WEST_2).build();
		return sqs;
	}

How to create both FIFO queue and Standard queue

public String createQueue(String name) {
		CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(name);
		CreateQueueResult result = amazonSQS.createQueue(createQueueRequest);
		log.debug("queue url: " + result.getQueueUrl());
		log.debug("queue: " + result.toString());
		return result.getQueueUrl();
	}

	/**
	 * Set delaySeconds if needed as it would be able to do at message sending.
	 * @param name
	 * @return
	 */
	public String createFIFOQueue(String name) {
		final Map<String, String> attributes = new HashMap<>();
		attributes.put("FifoQueue", "true");
		attributes.put("ContentBasedDeduplication", "true");
		CreateQueueRequest createQueueRequest = new CreateQueueRequest()
				.withQueueName(name + ".fifo")
				.withAttributes(attributes);
		CreateQueueResult result = amazonSQS.createQueue(createQueueRequest);
		log.debug("queue url: " + result.getQueueUrl());
		log.debug("queue: " + result.toString());
		return result.getQueueUrl();
	}

Send messages to a queue

	public boolean sendMessage(String queueUrl, SQSMessage msg, int delaySeconds, Map<String, MessageAttributeValue> messageAttributes) {
		
		if(msg==null) {
			throw new RuntimeException("msg is empty");
		}
		
		SendMessageRequest sendMessageRequest = new SendMessageRequest();

		sendMessageRequest.withMessageBody(msg.toJson());
		sendMessageRequest.withQueueUrl(queueUrl);
		
		if(messageAttributes!=null) {
			log.debug("messageAttributes set!");
			sendMessageRequest.withMessageAttributes(messageAttributes);
		}
		
		if(delaySeconds>0) {
			sendMessageRequest.withDelaySeconds(delaySeconds);
		}
		
		log.debug("messageAttributes={}",ObjectUtils.toJson(sendMessageRequest.getMessageAttributes()));
		
		SendMessageResult sendMessageResult = amazonSQS.sendMessage(sendMessageRequest);

		String resultId = sendMessageResult.getMessageId();

		log.info("resultId={}", resultId);

		return true;
	}

 

Receive messages

@Async
public void processAccountQueue() {
		int waitingSeconds = 10;
		String queueUrl = SQSQueue.ACCOUNTS_QUEUE_URL;

		final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
		receiveMessageRequest.withWaitTimeSeconds(waitingSeconds);

		while (true) {

			final List<Message> messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages();
			log.debug("polling {}", queueUrl);

			if (messages != null && messages.size() > 0) {
				for (final Message message : messages) {
					log.debug("Message Received");

					SQSMessage sqsMessage = SQSMessage.fromJson(message.getBody());

					int retryCount = 0;
                                        
					while (true) {
						try {
							handleAccountMessage(sqsMessage);
							break;
						} catch (Exception e) {
							if (retryCount == 5) {
								break;
							}
							log.debug("retryCount={}", retryCount);
							retryCount++;
						}
					}

					// delete message once it's done
					DeleteMessageResult deleteMessageResult = amazonSQS
							.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));

					log.debug("delete message response={}",
							ObjectUtils.toJson(deleteMessageResult.getSdkResponseMetadata()));

				}
			} else {
				log.debug("{} is empty", queueUrl);
			}
		}
	}

Source code on Github

 

SQS with AWS CLI

Get a list of Queues

aws sqs list-queues --profile {profile-name}

View messages from a queue

aws sqs receive-message --queue-url {queue-url} --profile {profile-name} --attribute-names All --message-attribute-names All --max-number-of-messages 10

Empty all messages from a queue

aws sqs purge-queue --queue-url {queue-url} --profile {profile-name}                                             

Send message to a queue

aws sqs send-message --queue-url {queue-url} --message-body "test body." --delay-seconds 10 --message-attributes file://message.json

Delete message from a queue

aws sqs delete-message --queue-url {queue-url} --receipt-handle {messageReceiptHandle}

 

Visibility timeout

Immediately after a message is received, it remains in the queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consumers from receiving and processing the message. The default visibility timeout for a message is 30 seconds. The minimum is 0 seconds. The maximum is 12 hours.

aws sqs change-message-visibility --queue-url {queue-url} --receipt-handle {messageReceiptHandle} --visibility-timeout 36000

 

If there are two readers or listeners listening to a particular queue, only one of the readers can process a message from the queue at any given time.

AWS SQS developer guide

 




Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *