From 4012f60cb6e58a6788407930654324e961bdea05 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Tue, 3 Apr 2018 19:52:10 +0200 Subject: [PATCH 1/8] Add support for priority in producer Signed-off-by: Javi Fontan --- README.md | 4 ++++ cli/borges/producer.go | 17 ++++++++++++++++- producer.go | 11 ++++++++++- producer_test.go | 5 +++-- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 75c1fe28..e3a2bc7f 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,10 @@ http://github.com/c/repo3 http://github.com/d/repo4.git ``` +You can change the priority of jobs produced with `--priority` option. It is a number from 0 to 8 where 0 is the lowest priority: + + borges producer --source=file --file /path/to/file --priority 8 + When jobs fail they're sent to the buried queue. If you want to requeue them, you can pass the `--republish-buried` flag (this only works for the `mentions` source). For example: ``` diff --git a/cli/borges/producer.go b/cli/borges/producer.go index c10f60a6..15454164 100644 --- a/cli/borges/producer.go +++ b/cli/borges/producer.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "os" @@ -24,11 +25,25 @@ type producerCmd struct { MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"` File string `long:"file" description:"path to a file to read URLs from, used with --source=file"` RepublishBuried bool `long:"republish-buried" description:"republishes again all buried jobs before starting to listen for mentions, used with --source=mentions"` + Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to 8 (highest)"` +} + +func checkPriority(prio uint8) error { + if prio > 8 { + return errors.New("Priority must be between 0 and 8") + } + + return nil } func (c *producerCmd) Execute(args []string) error { c.init() + err := checkPriority(c.Priority) + if err != nil { + return err + } + b := core.Broker() defer b.Close() q, err := b.Queue(c.Queue) @@ -42,7 +57,7 @@ func (c *producerCmd) Execute(args []string) error { } defer ioutil.CheckClose(ji, &err) - p := borges.NewProducer(log, ji, q) + p := borges.NewProducer(log, ji, q, queue.Priority(c.Priority)) p.Start() return err diff --git a/producer.go b/producer.go index 02f51476..756cb547 100644 --- a/producer.go +++ b/producer.go @@ -18,19 +18,26 @@ type Producer struct { running bool startOnce *sync.Once stopOnce *sync.Once + priority queue.Priority // used by Stop() to wait until Start() has finished startIsRunning chan struct{} } // NewProducer creates a new producer. -func NewProducer(log log15.Logger, jobIter JobIter, queue queue.Queue) *Producer { +func NewProducer( + log log15.Logger, + jobIter JobIter, + queue queue.Queue, + prio queue.Priority, +) *Producer { return &Producer{ log: log.New("mode", "producer"), jobIter: jobIter, queue: queue, startOnce: &sync.Once{}, stopOnce: &sync.Once{}, + priority: prio, } } @@ -95,6 +102,8 @@ func (p *Producer) add(j *Job) error { return err } + qj.SetPriority(p.priority) + return p.queue.Publish(qj) } diff --git a/producer_test.go b/producer_test.go index 3c11ee8b..1de6598b 100644 --- a/producer_test.go +++ b/producer_test.go @@ -38,7 +38,8 @@ func (s *ProducerSuite) SetupSuite() { func (s *ProducerSuite) newProducer() *Producer { storer := storage.FromDatabase(s.DB) - return NewProducer(log15.New(), NewMentionJobIter(s.mentionsQueue, storer), s.queue) + return NewProducer(log15.New(), NewMentionJobIter(s.mentionsQueue, storer), + s.queue, queue.PriorityNormal) } func (s *ProducerSuite) newJob() *queue.Job { @@ -112,7 +113,7 @@ func (s *ProducerSuite) TestStartStop_TwoEqualsJobs() { } func (s *ProducerSuite) TestStartStop_ErrorNoNotifier() { - p := NewProducer(log15.New(), &DummyJobIter{}, s.queue) + p := NewProducer(log15.New(), &DummyJobIter{}, s.queue, queue.PriorityNormal) go p.Start() From f6dea18506c55b00f1ada14e478e1b05b167678d Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 5 Apr 2018 16:33:24 +0200 Subject: [PATCH 2/8] Update framework dependency This version supports priorities in AMQP queues Signed-off-by: Javi Fontan --- Gopkg.lock | 2 +- .../framework.v0/lock/etcd_linux_test.go | 4 +- .../gopkg.in/src-d/framework.v0/queue/amqp.go | 2 + .../src-d/framework.v0/queue/amqp_test.go | 55 +++++++++++++++++++ .../gopkg.in/src-d/framework.v0/queue/job.go | 5 ++ 5 files changed, 65 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 96f28e69..91e40609 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -340,7 +340,7 @@ "lock", "queue" ] - revision = "0a07356ba7e9b1b9a9f793cbf5303cb55488a4e5" + revision = "86bf2e8520f2f43d4515f9e96c45ff171df054f5" [[projects]] name = "gopkg.in/src-d/go-billy-siva.v4" diff --git a/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go b/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go index a2937669..6c17ed1c 100644 --- a/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go +++ b/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go @@ -1,3 +1,5 @@ +// +build linux + package lock import ( @@ -5,8 +7,6 @@ import ( "time" ) -// +build linux - func (s *EtcdLockSuite) TestLockExpire() { assert := s.Assert() diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go b/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go index 289ead55..782edece 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go @@ -169,6 +169,7 @@ func (b *AMQPBroker) Queue(name string) (Queue, error) { amqp.Table{ "x-dead-letter-exchange": rex, "x-dead-letter-routing-key": name, + "x-max-priority": uint8(PriorityUrgent), }, ) @@ -246,6 +247,7 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error { "x-dead-letter-routing-key": q.queue.Name, "x-message-ttl": int64(ttl), "x-expires": int64(ttl) * 2, + "x-max-priority": uint8(PriorityUrgent), }, ) if err != nil { diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go b/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go index 3924da28..bb89153e 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go @@ -26,3 +26,58 @@ func TestNewAMQPBroker_bad_url(t *testing.T) { assert.Error(err) assert.Nil(b) } + +func sendJobs(assert *assert.Assertions, n int, p Priority, q Queue) { + for i := 0; i < n; i++ { + j, err := NewJob() + assert.NoError(err) + j.SetPriority(p) + err = j.Encode(i) + assert.NoError(err) + err = q.Publish(j) + assert.NoError(err) + } +} + +func TestAMQPPriorities(t *testing.T) { + assert := assert.New(t) + + broker, err := NewAMQPBroker(testAMQPURI) + assert.NoError(err) + assert.NotNil(broker) + + name := newName() + q, err := broker.Queue(name) + assert.NoError(err) + assert.NotNil(q) + + // Send 50 low priority jobs + sendJobs(assert, 50, PriorityLow, q) + + // Send 50 high priority jobs + sendJobs(assert, 50, PriorityUrgent, q) + + // Receive and collect priorities + iter, err := q.Consume(1) + assert.NoError(err) + assert.NotNil(iter) + + sumFirst := uint(0) + sumLast := uint(0) + + for i := 0; i < 100; i++ { + j, err := iter.Next() + assert.NoError(err) + assert.NoError(j.Ack()) + + if i < 50 { + sumFirst += uint(j.Priority) + } else { + sumLast += uint(j.Priority) + } + } + + assert.True(sumFirst > sumLast) + assert.Equal(uint(PriorityUrgent)*50, sumFirst) + assert.Equal(uint(PriorityLow)*50, sumLast) +} diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/job.go b/vendor/gopkg.in/src-d/framework.v0/queue/job.go index 2ae6a2a8..be967e19 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/job.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/job.go @@ -56,6 +56,11 @@ func NewJob() (*Job, error) { }, nil } +// SetPriority sets job priority +func (j *Job) SetPriority(priority Priority) { + j.Priority = priority +} + // Encode encodes the payload to the wire format used. func (j *Job) Encode(payload interface{}) error { var err error From 4f35af107797d2271600e5da4b0b7bf8611204c6 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 5 Apr 2018 17:37:12 +0200 Subject: [PATCH 3/8] Use constant instead of literal for max priority Signed-off-by: Javi Fontan --- cli/borges/producer.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cli/borges/producer.go b/cli/borges/producer.go index 15454164..1fd8c874 100644 --- a/cli/borges/producer.go +++ b/cli/borges/producer.go @@ -1,7 +1,6 @@ package main import ( - "errors" "fmt" "os" @@ -29,8 +28,8 @@ type producerCmd struct { } func checkPriority(prio uint8) error { - if prio > 8 { - return errors.New("Priority must be between 0 and 8") + if prio > uint8(queue.PriorityUrgent) { + return fmt.Errorf("Priority must be between 0 and %d", queue.PriorityUrgent) } return nil From 50c3901c0441794c40010f5b4e7d370b9300b7ae Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 5 Apr 2018 18:35:37 +0200 Subject: [PATCH 4/8] Rewrite priority description and default values Signed-off-by: Javi Fontan --- cli/borges/main.go | 13 +++++++++---- cli/borges/producer.go | 19 ++++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/cli/borges/main.go b/cli/borges/main.go index c4915a15..1eab47bf 100644 --- a/cli/borges/main.go +++ b/cli/borges/main.go @@ -103,16 +103,21 @@ func main() { panic(err) } - if _, err := parser.AddCommand(producerCmdName, producerCmdShortDesc, - producerCmdLongDesc, &producerCmd{}); err != nil { + c, err := parser.AddCommand(producerCmdName, producerCmdShortDesc, + producerCmdLongDesc, &producerCmd{}) + if err != nil { panic(err) } - if _, err := parser.AddCommand(initCmdName, initCmdShortDesc, initCmdLongDesc, new(initCmd)); err != nil { + setPrioritySettings(c) + + if _, err := parser.AddCommand(initCmdName, initCmdShortDesc, + initCmdLongDesc, new(initCmd)); err != nil { panic(err) } - if _, err := parser.AddCommand(packerCmdName, packerCmdShortDesc, packerCmdLongDesc, new(packerCmd)); err != nil { + if _, err := parser.AddCommand(packerCmdName, packerCmdShortDesc, + packerCmdLongDesc, new(packerCmd)); err != nil { panic(err) } diff --git a/cli/borges/producer.go b/cli/borges/producer.go index 1fd8c874..5dfce9df 100644 --- a/cli/borges/producer.go +++ b/cli/borges/producer.go @@ -3,7 +3,10 @@ package main import ( "fmt" "os" + "strconv" + "strings" + flags "github.com/jessevdk/go-flags" "github.com/src-d/borges" "github.com/src-d/borges/storage" @@ -24,7 +27,21 @@ type producerCmd struct { MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"` File string `long:"file" description:"path to a file to read URLs from, used with --source=file"` RepublishBuried bool `long:"republish-buried" description:"republishes again all buried jobs before starting to listen for mentions, used with --source=mentions"` - Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to 8 (highest)"` + Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"` +} + +// Changes the priority description and default on runtime as it is not +// possible to create a dynamic tag +func setPrioritySettings(c *flags.Command) { + options := c.Options() + + for _, o := range options { + if o.LongName == "priority" { + o.Default[0] = strconv.Itoa((int(queue.PriorityNormal))) + o.Description = strings.Replace( + o.Description, ":MAX:", strconv.Itoa(int(queue.PriorityUrgent)), 1) + } + } } func checkPriority(prio uint8) error { From 717346c6ea8b273038bdb2985284d14c68e16500 Mon Sep 17 00:00:00 2001 From: kuba-- Date: Thu, 5 Apr 2018 15:23:50 +0200 Subject: [PATCH 5/8] update gitignore (add exe file) --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 72a75163..8fa75d52 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,4 @@ coverage.txt # Built binaries bin/ build/ - +borges From 6f0afb05ede18dfef39dc408ff8d7ef56e6eda7f Mon Sep 17 00:00:00 2001 From: kuba-- Date: Thu, 5 Apr 2018 16:17:01 +0200 Subject: [PATCH 6/8] ignore borges.exe --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 8fa75d52..c640cb10 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ coverage.txt bin/ build/ borges +borges.exe + From 5c177180cb2c86749df5ca9f682179f70d5105bc Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 5 Apr 2018 11:19:51 +0200 Subject: [PATCH 7/8] Update reference time Previously all reference times were unset. This also caused the repository last_commit_at to be invalid. Signed-off-by: Javi Fontan --- changes.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/changes.go b/changes.go index 0015e599..6e2d67ca 100644 --- a/changes.go +++ b/changes.go @@ -132,6 +132,7 @@ func addToChangesDfferenceBetweenOldAndNewRefs( CreatedAt: createdAt, UpdatedAt: now, } + newReference.Time = newRef.Time c.Add(newReference) return nil @@ -148,6 +149,7 @@ func addToChangesDfferenceBetweenOldAndNewRefs( CreatedAt: oldRef.CreatedAt, UpdatedAt: now, } + updateReference.Time = newRef.Time c.Update(oldRef, updateReference) } From 8f1241c07cc02fe719d710f805507c3c895afa51 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 5 Apr 2018 16:26:34 +0200 Subject: [PATCH 8/8] Add updater command The updater wakes up periodically and searches for repositories to update. There is a configurable maximum of jobs per update cycle. Currently it has three searches that are done one after the other until the maximum number of jobs is reached or all the searches ar finished: * Repositories updated x days ago (we call these "old"). The priority for these update jobs is high. * Repositories updated x/2 days ago. The priority is normal. * Any other repository ordered by the difference between the last commit time and the last update time. Stagnant repos will have longer difference between those two times. The priority is low. Signed-off-by: Javi Fontan --- cli/borges/main.go | 4 + cli/borges/updater.go | 41 +++++++++ updater.go | 207 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 cli/borges/updater.go create mode 100644 updater.go diff --git a/cli/borges/main.go b/cli/borges/main.go index 1eab47bf..c8617f44 100644 --- a/cli/borges/main.go +++ b/cli/borges/main.go @@ -121,6 +121,10 @@ func main() { panic(err) } + if _, err := parser.AddCommand(updaterCmdName, updaterCmdShortDesc, updaterCmdLongDesc, new(updaterCmd)); err != nil { + panic(err) + } + if _, err := parser.Parse(); err != nil { if err, ok := err.(*flags.Error); ok { if err.Type == flags.ErrHelp { diff --git a/cli/borges/updater.go b/cli/borges/updater.go new file mode 100644 index 00000000..2199939d --- /dev/null +++ b/cli/borges/updater.go @@ -0,0 +1,41 @@ +package main + +import ( + "time" + + "github.com/src-d/borges" + core "gopkg.in/src-d/core-retrieval.v0" +) + +const ( + updaterCmdName = "updater" + updaterCmdShortDesc = "creates new jobs to update fetched repos" + updaterCmdLongDesc = "" +) + +type updaterCmd struct { + cmd + Time float32 `long:"time" default:"5" description:"time in minutes between each update, it can be fractional (0.5 = 30 seconds)"` + MaxJobs uint `long:"max" default:"200" description:"maximum number of jobs sent per update"` + OldJobs float32 `long:"old" default:"30" description:"time in days when a repository update is considered old"` +} + +func (c *updaterCmd) Execute(args []string) error { + c.init() + + b := core.Broker() + defer b.Close() + q, err := b.Queue(c.Queue) + if err != nil { + + } + + t := time.Duration(c.Time * float32(time.Minute)) + o := time.Duration(c.OldJobs * float32(time.Hour*24)) + + db := core.Database() + updater := borges.NewUpdater(log, db, q, t, c.MaxJobs, o) + updater.Start() + + return nil +} diff --git a/updater.go b/updater.go new file mode 100644 index 00000000..ff30f4f4 --- /dev/null +++ b/updater.go @@ -0,0 +1,207 @@ +package borges + +import ( + "database/sql" + "time" + + "github.com/inconshreveable/log15" + uuid "github.com/satori/go.uuid" + "github.com/src-d/borges/storage" + core "gopkg.in/src-d/core-retrieval.v0" + "gopkg.in/src-d/core-retrieval.v0/model" + "gopkg.in/src-d/framework.v0/queue" + kallax "gopkg.in/src-d/go-kallax.v1" +) + +const ( + queryByUpdate = `select id from repositories where status='fetched' and + updated_at < $1 order by updated_at limit $2` + queryByLastCommit = `select id from repositories where status='fetched' + order by age(updated_at, last_commit_at) limit $1` +) + +type updaterSearchFunction struct { + name string + function func(int) ([]kallax.ULID, error) + priority queue.Priority +} + +// Updater finds fetched repositories to be updated +type Updater struct { + log log15.Logger + + db *sql.DB + storage storage.RepoStore + queue queue.Queue + + // Time between update actions + cadence time.Duration + + // Maximum number of jobs to submit per update + maxJobs uint + + // Age when we consider a repository update to be old + oldJobs time.Duration +} + +// NewUpdater creates a new Updater +func NewUpdater( + log log15.Logger, + db *sql.DB, + queue queue.Queue, + cadence time.Duration, + maxJobs uint, + oldJobs time.Duration, +) *Updater { + return &Updater{ + log: log, + db: db, + storage: storage.FromDatabase(core.Database()), + queue: queue, + cadence: cadence, + maxJobs: maxJobs, + oldJobs: oldJobs, + } +} + +// Start initializes the update process +func (u *Updater) Start() { + ticker := time.NewTicker(u.cadence) + + for t := range ticker.C { + u.execute(t) + } +} + +func (u *Updater) searchFunctions() []updaterSearchFunction { + return []updaterSearchFunction{ + {"old repositories", u.reposOld, queue.PriorityUrgent}, + {"almost old repositories", u.reposAlmostOld, queue.PriorityNormal}, + {"best effort", u.reposBestEffort, queue.PriorityLow}, + } +} + +func (u *Updater) execute(t time.Time) { + executeStart := time.Now() + jobsLeft := int(u.maxJobs) + + for _, f := range u.searchFunctions() { + log := u.log.New("function", f.name) + + log.Debug("search start", "priority", f.priority) + start := time.Now() + + num, err := u.executeFunction(log, f, jobsLeft) + if err != nil { + continue + } + + log.Debug("search end", "duration", time.Since(start), + "jobs", num) + + jobsLeft -= num + + if jobsLeft <= 0 { + log.Debug("maximum number of jobs") + return + } + } + + u.log.Info("Update finished", "jobs", int(u.maxJobs)-jobsLeft, + "duration", time.Since(executeStart)) +} + +func (u *Updater) executeFunction( + log log15.Logger, + f updaterSearchFunction, + limit int, +) (int, error) { + log = log.New("function", f.name) + + res, err := f.function(limit) + if err != nil { + log.Error("Error executing search function", "query", err) + return 0, err + } + + for _, s := range res { + log = log.New("uuid", s) + + id, err := uuid.FromString(s.String()) + if err != nil { + log.Error("Could not parse uuid", "error", err) + continue + } + + job, err := queue.NewJob() + if err != nil { + log.Error("Could not create job", "error", err) + continue + } + + payload := Job{RepositoryID: id} + if err := job.Encode(payload); err != nil { + log.Error("Could not encode job", "error", err) + continue + } + + job.SetPriority(f.priority) + + repo, err := u.storage.Get(s) + if err != nil { + log.Error("Could not get repository from database", "error", err) + continue + } + + err = u.storage.SetStatus(repo, model.Pending) + if err != nil { + log.Error("Could not change repository status in the database", + "error", err) + continue + } + + if err := u.queue.Publish(job); err != nil { + log.Error("Could not submit job", "error", err) + continue + } + } + + return len(res), nil +} + +func (u *Updater) reposBestEffort(limit int) ([]kallax.ULID, error) { + return u.transformResult(u.db.Query(queryByLastCommit, limit)) +} + +func (u *Updater) reposOld(limit int) ([]kallax.ULID, error) { + date := time.Now().Add(-1 * u.oldJobs) + return u.transformResult(u.db.Query(queryByUpdate, date, limit)) +} + +func (u *Updater) reposAlmostOld(limit int) ([]kallax.ULID, error) { + date := time.Now().Add(-1 * u.oldJobs / 2) + return u.transformResult(u.db.Query(queryByUpdate, date, limit)) +} + +func (u *Updater) transformResult( + rows *sql.Rows, + err error, +) ([]kallax.ULID, error) { + if err != nil { + return nil, err + } + + result := make([]kallax.ULID, 0, u.maxJobs) + for rows.Next() { + var r kallax.ULID + err := rows.Scan(&r) + if err != nil { + u.log.Error("Error retrieving row", err) + continue + } + + result = append(result, r) + } + + return result, rows.Err() +}