-
Notifications
You must be signed in to change notification settings - Fork 4
Add parameter to RepublishBuried signature to filter jobs #40
Add parameter to RepublishBuried signature to filter jobs #40
Conversation
queue/memory.go
Outdated
| if err := q.Publish(j); err != nil { | ||
| return err | ||
| } | ||
| // TODO: remove job fom q.buriedJobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a leftover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More or less. In fact I'm not sure, before that change it didn't remove the job from the slice too, so idk if is intended, a bug or it doesn't matter.
queue/amqp.go
Outdated
|
|
||
| retries = 0 | ||
| if comply(j) { | ||
| if err = j.Ack(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should Ack the job after sending it to the other queue, and try to NACK if the publish fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also reject the jobs just after publishing error or failing compliance. Adding them to a slice to process afterwards can make some of them unprocessed if some error happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw. did you think about dead-letter-exchange. we can use it for "non immediately retry".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jfontan I can't reject immediately because I'd send back the job to the queue it's being iterated, so we will end up stuck in an infinite loop.
@kuba, this queue is already the dead-letter-exchange queue, so we would have to configure a buried queue for the buried queue, and republish from the child buried queue to the parent buried queue.
Finally we will have the same situation between those two buried queues. Not sure about it 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe we need an another exchange with counter (and maybe size limit). If we exceed some q's size or timeout messages will be enqueued again but retry counter will be increased. If we got e.g. 5 retries and it's still failing we can drop a message. Maybe it's hard to explain as a comment but maybe just as a suggestion this is what I mean: https://felipeelias.github.io/rabbitmq/2016/02/22/rabbitmq-exponential-backoff.html
This is what me and my colleague (Felipe) implemented long time ago to avoid dropping messages and handle backoff.
queue/amqp.go
Outdated
| func (q *AMQPQueue) RepublishBuried() error { | ||
| // RepublishBuried will republish in the main queue those jobs that timed out without Ack | ||
| // or were Rejected with requeue = False and and makes complay return true. | ||
| func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe to do not break the API we should create a new method called RepublishBuriedConditionally with all that logic, and a RepublishBuried calling to RepublishBuriedConditionally with a function that accepts everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or accept a ...RepublishConditionFunc which would allow several small validation functions and you can still call it as RepublishBuried() thus not breaking API compat
queue/amqp.go
Outdated
| defer iter.Close() | ||
|
|
||
| retries := 0 | ||
| notComplying := make([]*Job, 0, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 for any specific reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I just thought about having a defined small initial capacity.
Is there some recommendation for those cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just var something []Foo, it won't make much difference in terms of memory usage and/or speed and you don't have to worry about which number to pick
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
…arameter Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
13d2dbe to
2b23461
Compare
queue/amqp.go
Outdated
| if err = q.Publish(j); err != nil { | ||
| return err | ||
| func handleRepublishErrors(list []*jobErr) error { | ||
| if len(list) > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 1? Shouldn't be > 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😳 Yes, it should be!
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
|
ping @ajnavarro |
ajnavarro
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small minor docs changes and LGTM
queue/amqp.go
Outdated
| } | ||
|
|
||
| // RepublishBuried will republish in the main queue those jobs that timed out without Ack | ||
| // or were Rejected with requeue = False and and makes complay return true. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reword this please; remove duplicated and and complay => comply
ajnavarro
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we should add some tests
|
@ajnavarro I already added a test and, incidentally, caught a 🐛 |
|
|
||
| type republishConditions []RepublishConditionFunc | ||
|
|
||
| func (c republishConditions) comply(job *Job) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this be simplified?
if len(c) == 0 {
return true
}
for _, condition := range c {
if condition(job) {
return true
}
}
return falseOtherwise LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it looks like much better 👍
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
Signed-off-by: Manuel Carmona <manu.carmona90@gmail.com>
8c743a1 to
e5c6eaf
Compare
Related to src-d/borges#250