Skip to content
This repository was archived by the owner on Sep 28, 2018. It is now read-only.

Conversation

@mcarmonaa
Copy link
Contributor

Related to src-d/borges#250

queue/memory.go Outdated
if err := q.Publish(j); err != nil {
return err
}
// TODO: remove job fom q.buriedJobs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More or less. In fact I'm not sure, before that change it didn't remove the job from the slice too, so idk if is intended, a bug or it doesn't matter.

queue/amqp.go Outdated

retries = 0
if comply(j) {
if err = j.Ack(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should Ack the job after sending it to the other queue, and try to NACK if the publish fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also reject the jobs just after publishing error or failing compliance. Adding them to a slice to process afterwards can make some of them unprocessed if some error happens.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. did you think about dead-letter-exchange. we can use it for "non immediately retry".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfontan I can't reject immediately because I'd send back the job to the queue it's being iterated, so we will end up stuck in an infinite loop.

@kuba, this queue is already the dead-letter-exchange queue, so we would have to configure a buried queue for the buried queue, and republish from the child buried queue to the parent buried queue.

Finally we will have the same situation between those two buried queues. Not sure about it 😕

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe we need an another exchange with counter (and maybe size limit). If we exceed some q's size or timeout messages will be enqueued again but retry counter will be increased. If we got e.g. 5 retries and it's still failing we can drop a message. Maybe it's hard to explain as a comment but maybe just as a suggestion this is what I mean: https://felipeelias.github.io/rabbitmq/2016/02/22/rabbitmq-exponential-backoff.html
This is what me and my colleague (Felipe) implemented long time ago to avoid dropping messages and handle backoff.

queue/amqp.go Outdated
func (q *AMQPQueue) RepublishBuried() error {
// RepublishBuried will republish in the main queue those jobs that timed out without Ack
// or were Rejected with requeue = False and and makes complay return true.
func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe to do not break the API we should create a new method called RepublishBuriedConditionally with all that logic, and a RepublishBuried calling to RepublishBuriedConditionally with a function that accepts everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or accept a ...RepublishConditionFunc which would allow several small validation functions and you can still call it as RepublishBuried() thus not breaking API compat

queue/amqp.go Outdated
defer iter.Close()

retries := 0
notComplying := make([]*Job, 0, 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 for any specific reason?

Copy link
Contributor Author

@mcarmonaa mcarmonaa Apr 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I just thought about having a defined small initial capacity.
Is there some recommendation for those cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just var something []Foo, it won't make much difference in terms of memory usage and/or speed and you don't have to worry about which number to pick

Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
…arameter

Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
@mcarmonaa mcarmonaa force-pushed the improvement/queue-republish-buried-filter branch from 13d2dbe to 2b23461 Compare April 13, 2018 06:06
queue/amqp.go Outdated
if err = q.Publish(j); err != nil {
return err
func handleRepublishErrors(list []*jobErr) error {
if len(list) > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 1? Shouldn't be > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😳 Yes, it should be!

Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
@mcarmonaa mcarmonaa changed the title Add paramanter to RepublishBuried signature to filter jobs Add parameter to RepublishBuried signature to filter jobs Apr 18, 2018
@mcarmonaa
Copy link
Contributor Author

ping @ajnavarro

Copy link
Contributor

@ajnavarro ajnavarro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small minor docs changes and LGTM

queue/amqp.go Outdated
}

// RepublishBuried will republish in the main queue those jobs that timed out without Ack
// or were Rejected with requeue = False and and makes complay return true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reword this please; remove duplicated and and complay => comply

Copy link
Contributor

@ajnavarro ajnavarro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should add some tests

@mcarmonaa
Copy link
Contributor Author

@ajnavarro I already added a test and, incidentally, caught a 🐛


type republishConditions []RepublishConditionFunc

func (c republishConditions) comply(job *Job) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this be simplified?

if len(c) == 0 {
  return true
}

for _, condition := range c {
  if condition(job) {
    return true
  }
}

return false

Otherwise LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it looks like much better 👍

Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
@mcarmonaa mcarmonaa force-pushed the improvement/queue-republish-buried-filter branch from 8c743a1 to e5c6eaf Compare April 20, 2018 13:49
@mcarmonaa mcarmonaa merged commit 88f5399 into src-d:master Apr 20, 2018
@mcarmonaa mcarmonaa deleted the improvement/queue-republish-buried-filter branch April 20, 2018 14:29
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants