From e9eeeec920460e08b3825322839c54f6b8b40e08 Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Thu, 12 Apr 2018 12:10:00 +0200 Subject: [PATCH 1/4] Organize commands Signed-off-by: Manuel Carmona --- cli/borges/republish.go | 23 +++++++++++++++++++++++ cli/borges/update.go | 23 +++++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 cli/borges/republish.go create mode 100644 cli/borges/update.go diff --git a/cli/borges/republish.go b/cli/borges/republish.go new file mode 100644 index 00000000..b757b9cd --- /dev/null +++ b/cli/borges/republish.go @@ -0,0 +1,23 @@ +package main + +const ( + republishCmdName = "republish" + republishCmdShortName = "requeue jobs from buried queues" + republishCmdLongDesc = "" +) + +// republishCommand is a producer subcommand. +var republishCommand = &republishCmd{producerSubcmd: newProducerSubcmd( + republishCmdName, + republishCmdShortName, + republishCmdLongDesc, +)} + +type republishCmd struct { + producerSubcmd +} + +func (c *republishCmd) Execute(args []string) error { + log.Warn("Republish command is not implemented yet") + return nil +} diff --git a/cli/borges/update.go b/cli/borges/update.go new file mode 100644 index 00000000..901f4ac7 --- /dev/null +++ b/cli/borges/update.go @@ -0,0 +1,23 @@ +package main + +const ( + updateCmdName = "update" + updateCmdShortName = "update repositories processed previously" + updateCmdLongDesc = "" +) + +// updateCommand is a producer subcommand. +var updateCommand = &updateCmd{producerSubcmd: newProducerSubcmd( + updateCmdName, + updateCmdShortName, + updateCmdLongDesc, +)} + +type updateCmd struct { + producerSubcmd +} + +func (c *updateCmd) Execute(args []string) error { + log.Warn("Update command is not implemented yet") + return nil +} From 669b83cb4c631092f11790abe47f4579b3d6c5da Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Fri, 13 Apr 2018 14:59:27 +0200 Subject: [PATCH 2/4] Add command to republish buried jobs Signed-off-by: Manuel Carmona --- cli/borges/init.go | 4 +-- cli/borges/producer.go | 1 + cli/borges/republish.go | 55 ++++++++++++++++++++++++++++++++++++++--- worker.go | 4 +-- 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/cli/borges/init.go b/cli/borges/init.go index 9890b86c..e589b92a 100644 --- a/cli/borges/init.go +++ b/cli/borges/init.go @@ -3,8 +3,6 @@ package main import ( "fmt" - "github.com/inconshreveable/log15" - "gopkg.in/src-d/core-retrieval.v0/schema" "gopkg.in/src-d/framework.v0/database" ) @@ -38,7 +36,7 @@ func (c *initCmd) Execute(args []string) error { return fmt.Errorf("unable to create database schema: %s", err) } - log15.Info("database was successfully initialized") + log.Info("database was successfully initialized") return nil } diff --git a/cli/borges/producer.go b/cli/borges/producer.go index 2468e7f3..c353a789 100644 --- a/cli/borges/producer.go +++ b/cli/borges/producer.go @@ -105,6 +105,7 @@ func checkPriority(prio uint8) error { var producerSubcommands = []ExecutableCommand{ mentionsCommand, fileCommand, + republishCommand, } func init() { diff --git a/cli/borges/republish.go b/cli/borges/republish.go index b757b9cd..fdb7bfcc 100644 --- a/cli/borges/republish.go +++ b/cli/borges/republish.go @@ -1,23 +1,72 @@ package main +import ( + "time" + + "github.com/src-d/borges" + "gopkg.in/src-d/framework.v0/queue" +) + const ( republishCmdName = "republish" - republishCmdShortName = "requeue jobs from buried queues" + republishCmdShortDesc = "requeue jobs from buried queues" republishCmdLongDesc = "" ) // republishCommand is a producer subcommand. var republishCommand = &republishCmd{producerSubcmd: newProducerSubcmd( republishCmdName, - republishCmdShortName, + republishCmdShortDesc, republishCmdLongDesc, )} type republishCmd struct { producerSubcmd + + Time string `long:"time" short:"t" default:"0" description:"elapsed time between republish triggers"` } func (c *republishCmd) Execute(args []string) error { - log.Warn("Republish command is not implemented yet") + lapse, err := time.ParseDuration(c.Time) + if err != nil { + return err + } + + if err := c.producerSubcmd.init(); err != nil { + return err + } + defer c.broker.Close() + + log.Info("starting republishing jobs...", "time", c.Time) + + log.Debug("republish task triggered ") + if err := c.queue.RepublishBuried(republishCondition); err != nil { + log.Error("error republishing buried jobs", "error", err) + } + + if lapse != 0 { + c.runPeriodically(lapse) + } + + log.Info("stopping republishing jobs") return nil } + +func republishCondition(job *queue.Job) bool { + // Althoug the job has the temporary error tag, it must be checked + // that the retries is equals to zero. The reason for this is that + // a job can panic during a retry process, so it can be tagged as + // temporary error and a number of retries greater than zero reveals + // that fact. + return job.ErrorType == borges.TemporaryError && job.Retries == 0 +} + +func (c *republishCmd) runPeriodically(lapse time.Duration) { + ticker := time.Tick(lapse) + for range ticker { + log.Debug("republish task triggered ") + if err := c.queue.RepublishBuried(republishCondition); err != nil { + log.Error("error republishing buried jobs", "error", err) + } + } +} diff --git a/worker.go b/worker.go index 7ec6702a..089ce322 100644 --- a/worker.go +++ b/worker.go @@ -4,7 +4,7 @@ import ( "github.com/inconshreveable/log15" ) -const temporaryError = "temporary" +const TemporaryError = "temporary" // Worker is a worker that processes jobs from a channel. type Worker struct { @@ -63,7 +63,7 @@ func (w *Worker) Start() { if requeue { job.queueJob.Retries-- - job.queueJob.ErrorType = temporaryError + job.queueJob.ErrorType = TemporaryError job.source.Publish(job.queueJob) } From cc1803aea2b67eb9d349dd75f4e2af2a8b0ea19e Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Tue, 17 Apr 2018 10:45:25 +0200 Subject: [PATCH 3/4] fix unhandled error in worker Signed-off-by: Manuel Carmona --- worker.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/worker.go b/worker.go index 089ce322..42c241f7 100644 --- a/worker.go +++ b/worker.go @@ -64,7 +64,14 @@ func (w *Worker) Start() { if requeue { job.queueJob.Retries-- job.queueJob.ErrorType = TemporaryError - job.source.Publish(job.queueJob) + if err := job.source.Publish(job.queueJob); err != nil { + log.Error("error publishing job back to the main queue", "error", err) + if err := job.queueJob.Reject(false); err != nil { + log.Error("error rejecting job", "error", err) + } + + continue + } } if err := job.queueJob.Ack(); err != nil { From 356c123947c83a8febfad64ad79d587f36dc0934 Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Fri, 20 Apr 2018 16:40:55 +0200 Subject: [PATCH 4/4] Update src-d/framework dependency Signed-off-by: Manuel Carmona --- Gopkg.lock | 4 +- Gopkg.toml | 2 +- .../gopkg.in/src-d/framework.v0/.travis.yml | 3 +- .../gopkg.in/src-d/framework.v0/queue/amqp.go | 50 +++++++++++++++--- .../src-d/framework.v0/queue/amqp_test.go | 52 +++++++++++++++++++ .../src-d/framework.v0/queue/common.go | 24 ++++++++- .../src-d/framework.v0/queue/memory.go | 12 +++-- 7 files changed, 132 insertions(+), 15 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 0de8f137..de7adad0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -338,7 +338,7 @@ "lock", "queue" ] - revision = "0839a41303d85dd722c2bf23106f01e0f6758115" + revision = "88f5399ee0037158edaef606ec57b7037a80c02a" source = "https://github.com/src-d/framework.git" [[projects]] @@ -461,6 +461,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "d5f3e33031a3bdc64909673b95d95bc20bce0f4a4bc695f9fd9a024b2e33a45e" + inputs-digest = "166dc6fbcc7d1e8cffdf77291f3176d8f7fa48ad75179fcb650cd7f63359cf15" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index bca62715..fa760430 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -17,7 +17,7 @@ [[constraint]] name = "gopkg.in/src-d/framework.v0" - revision = "0839a41303d85dd722c2bf23106f01e0f6758115" + revision = "88f5399ee0037158edaef606ec57b7037a80c02a" source = "https://github.com/src-d/framework.git" [[constraint]] diff --git a/vendor/gopkg.in/src-d/framework.v0/.travis.yml b/vendor/gopkg.in/src-d/framework.v0/.travis.yml index 9897c577..8c252b24 100644 --- a/vendor/gopkg.in/src-d/framework.v0/.travis.yml +++ b/vendor/gopkg.in/src-d/framework.v0/.travis.yml @@ -1,7 +1,8 @@ language: go go: - - 1.9 + - 1.9.x + - 1.10.x - tip go_import_path: gopkg.in/src-d/framework.v0 diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go b/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go index 6ed04b3d..de86a589 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go @@ -3,6 +3,7 @@ package queue import ( "fmt" "os" + "strings" "sync" "sync/atomic" "time" @@ -18,6 +19,7 @@ var ( ErrConnectionFailed = errors.NewKind("failed to connect to RabbitMQ: %s") ErrOpenChannel = errors.NewKind("failed to open a channel: %s") ErrRetrievingHeader = errors.NewKind("error retrieving '%s' header from message %s") + ErrRepublishingJobs = errors.NewKind("couldn't republish some jobs : %s") ) const ( @@ -288,9 +290,14 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error { ) } -// RepublishBuried will republish in the main queue all the jobs that timed out without Ack -// or were Rejected with requeue = False. -func (q *AMQPQueue) RepublishBuried() error { +type jobErr struct { + job *Job + err error +} + +// RepublishBuried will republish in the main queue those jobs that timed out without Ack +// or were Rejected with requeue = False and makes comply return true. +func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { if q.buriedQueue == nil { return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?") } @@ -304,6 +311,8 @@ func (q *AMQPQueue) RepublishBuried() error { defer iter.Close() retries := 0 + var notComplying []*Job + var errorsPublishing []*jobErr for { j, err := iter.(*AMQPJobIter).nextNonBlocking() if err != nil { @@ -316,7 +325,7 @@ func (q *AMQPQueue) RepublishBuried() error { // if there is nothing after all the retries (meaning: BuriedQueue is surely // empty or any arriving jobs will have to wait to the next call). if retries > buriedNonBlockingRetries { - return nil + break } time.Sleep(50 * time.Millisecond) @@ -324,16 +333,45 @@ func (q *AMQPQueue) RepublishBuried() error { continue } + retries = 0 + if err = j.Ack(); err != nil { return err } - retries = 0 + if republishConditions(conditions).comply(j) { + if err = q.Publish(j); err != nil { + errorsPublishing = append(errorsPublishing, &jobErr{j, err}) + } + } else { + notComplying = append(notComplying, j) + + } + } - if err = q.Publish(j); err != nil { + for _, job := range notComplying { + if err = job.Reject(true); err != nil { return err } } + + return q.handleRepublishErrors(errorsPublishing) +} + +func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error { + if len(list) > 0 { + stringErrors := []string{} + for _, je := range list { + stringErrors = append(stringErrors, je.err.Error()) + if err := q.buriedQueue.Publish(je.job); err != nil { + return err + } + } + + return ErrRepublishingJobs.New(strings.Join(stringErrors, ": ")) + } + + return nil } // Transaction executes the given callback inside a transaction. diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go b/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go index 7967c80e..f6f229cb 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go @@ -3,6 +3,7 @@ package queue import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -144,3 +145,54 @@ func TestAMQPHeaders(t *testing.T) { }) } } + +func TestAMQPRepublishBuried(t *testing.T) { + broker, err := NewBroker(testAMQPURI) + require.NoError(t, err) + defer func() { require.NoError(t, broker.Close()) }() + + queueName := newName() + queue, err := broker.Queue(queueName) + require.NoError(t, err) + + amqpQueue, ok := queue.(*AMQPQueue) + require.True(t, ok) + + buried := amqpQueue.buriedQueue + + tests := []struct { + name string + payload string + }{ + {name: "message 1", payload: "payload 1"}, + {name: "message 2", payload: "republish"}, + {name: "message 3", payload: "payload 3"}, + {name: "message 3", payload: "payload 4"}, + } + + for _, test := range tests { + job, err := NewJob() + require.NoError(t, err) + + job.raw = []byte(test.payload) + + err = buried.Publish(job) + require.NoError(t, err) + time.Sleep(1 * time.Second) + } + + var condition RepublishConditionFunc = func(j *Job) bool { + return string(j.raw) == "republish" + } + + err = queue.RepublishBuried(condition) + require.NoError(t, err) + + jobIter, err := queue.Consume(1) + require.NoError(t, err) + defer func() { require.NoError(t, jobIter.Close()) }() + + job, err := jobIter.Next() + require.NoError(t, err) + require.Equal(t, string(job.raw), "republish") +} diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/common.go b/vendor/gopkg.in/src-d/framework.v0/queue/common.go index afc614da..0269dc2c 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/common.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/common.go @@ -68,6 +68,25 @@ func NewBroker(uri string) (Broker, error) { // TxCallback is a function to be called in a transaction. type TxCallback func(q Queue) error +// RepublishConditionFunc is a function used to filter jobs to republish. +type RepublishConditionFunc func(job *Job) bool + +type republishConditions []RepublishConditionFunc + +func (c republishConditions) comply(job *Job) bool { + if len(c) == 0 { + return true + } + + for _, condition := range c { + if condition(job) { + return true + } + } + + return false +} + // Queue represents a message queue. type Queue interface { // Publish publishes the given Job to the queue. @@ -80,8 +99,9 @@ type Queue interface { // number of undelivered jobs the iterator will allow at any given // time (see the Acknowledger interface). Consume(advertisedWindow int) (JobIter, error) - // RepublishBuried republish all jobs in the buried queue to the main one - RepublishBuried() error + // RepublishBuried republish to the main queue those jobs complying + // one of the conditions, leaving the rest of them in the buried queue. + RepublishBuried(conditions ...RepublishConditionFunc) error } // JobIter represents an iterator over a set of Jobs. diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/memory.go b/vendor/gopkg.in/src-d/framework.v0/queue/memory.go index 07421e13..26f717ed 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/memory.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/memory.go @@ -64,9 +64,15 @@ func (q *memoryQueue) PublishDelayed(j *Job, delay time.Duration) error { return nil } -func (q *memoryQueue) RepublishBuried() error { - for _, j := range q.buriedJobs { - q.Publish(j) +// RepublishBuried implement the Queue interface. +func (q *memoryQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { + for _, job := range q.buriedJobs { + if republishConditions(conditions).comply(job) { + job.ErrorType = "" + if err := q.Publish(job); err != nil { + return err + } + } } return nil }