diff --git a/.gitignore b/.gitignore index 72a75163..c640cb10 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,6 @@ coverage.txt # Built binaries bin/ build/ +borges +borges.exe diff --git a/Gopkg.lock b/Gopkg.lock index 96f28e69..91e40609 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -340,7 +340,7 @@ "lock", "queue" ] - revision = "0a07356ba7e9b1b9a9f793cbf5303cb55488a4e5" + revision = "86bf2e8520f2f43d4515f9e96c45ff171df054f5" [[projects]] name = "gopkg.in/src-d/go-billy-siva.v4" diff --git a/README.md b/README.md index 75c1fe28..e3a2bc7f 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,10 @@ http://github.com/c/repo3 http://github.com/d/repo4.git ``` +You can change the priority of jobs produced with `--priority` option. It is a number from 0 to 8 where 0 is the lowest priority: + + borges producer --source=file --file /path/to/file --priority 8 + When jobs fail they're sent to the buried queue. If you want to requeue them, you can pass the `--republish-buried` flag (this only works for the `mentions` source). For example: ``` diff --git a/changes.go b/changes.go index 0015e599..6e2d67ca 100644 --- a/changes.go +++ b/changes.go @@ -132,6 +132,7 @@ func addToChangesDfferenceBetweenOldAndNewRefs( CreatedAt: createdAt, UpdatedAt: now, } + newReference.Time = newRef.Time c.Add(newReference) return nil @@ -148,6 +149,7 @@ func addToChangesDfferenceBetweenOldAndNewRefs( CreatedAt: oldRef.CreatedAt, UpdatedAt: now, } + updateReference.Time = newRef.Time c.Update(oldRef, updateReference) } diff --git a/cli/borges/main.go b/cli/borges/main.go index c4915a15..c8617f44 100644 --- a/cli/borges/main.go +++ b/cli/borges/main.go @@ -103,16 +103,25 @@ func main() { panic(err) } - if _, err := parser.AddCommand(producerCmdName, producerCmdShortDesc, - producerCmdLongDesc, &producerCmd{}); err != nil { + c, err := parser.AddCommand(producerCmdName, producerCmdShortDesc, + producerCmdLongDesc, &producerCmd{}) + if err != nil { + panic(err) + } + + setPrioritySettings(c) + + if _, err := parser.AddCommand(initCmdName, initCmdShortDesc, + initCmdLongDesc, new(initCmd)); err != nil { panic(err) } - if _, err := parser.AddCommand(initCmdName, initCmdShortDesc, initCmdLongDesc, new(initCmd)); err != nil { + if _, err := parser.AddCommand(packerCmdName, packerCmdShortDesc, + packerCmdLongDesc, new(packerCmd)); err != nil { panic(err) } - if _, err := parser.AddCommand(packerCmdName, packerCmdShortDesc, packerCmdLongDesc, new(packerCmd)); err != nil { + if _, err := parser.AddCommand(updaterCmdName, updaterCmdShortDesc, updaterCmdLongDesc, new(updaterCmd)); err != nil { panic(err) } diff --git a/cli/borges/producer.go b/cli/borges/producer.go index c10f60a6..5dfce9df 100644 --- a/cli/borges/producer.go +++ b/cli/borges/producer.go @@ -3,7 +3,10 @@ package main import ( "fmt" "os" + "strconv" + "strings" + flags "github.com/jessevdk/go-flags" "github.com/src-d/borges" "github.com/src-d/borges/storage" @@ -24,11 +27,39 @@ type producerCmd struct { MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"` File string `long:"file" description:"path to a file to read URLs from, used with --source=file"` RepublishBuried bool `long:"republish-buried" description:"republishes again all buried jobs before starting to listen for mentions, used with --source=mentions"` + Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"` +} + +// Changes the priority description and default on runtime as it is not +// possible to create a dynamic tag +func setPrioritySettings(c *flags.Command) { + options := c.Options() + + for _, o := range options { + if o.LongName == "priority" { + o.Default[0] = strconv.Itoa((int(queue.PriorityNormal))) + o.Description = strings.Replace( + o.Description, ":MAX:", strconv.Itoa(int(queue.PriorityUrgent)), 1) + } + } +} + +func checkPriority(prio uint8) error { + if prio > uint8(queue.PriorityUrgent) { + return fmt.Errorf("Priority must be between 0 and %d", queue.PriorityUrgent) + } + + return nil } func (c *producerCmd) Execute(args []string) error { c.init() + err := checkPriority(c.Priority) + if err != nil { + return err + } + b := core.Broker() defer b.Close() q, err := b.Queue(c.Queue) @@ -42,7 +73,7 @@ func (c *producerCmd) Execute(args []string) error { } defer ioutil.CheckClose(ji, &err) - p := borges.NewProducer(log, ji, q) + p := borges.NewProducer(log, ji, q, queue.Priority(c.Priority)) p.Start() return err diff --git a/cli/borges/updater.go b/cli/borges/updater.go new file mode 100644 index 00000000..2199939d --- /dev/null +++ b/cli/borges/updater.go @@ -0,0 +1,41 @@ +package main + +import ( + "time" + + "github.com/src-d/borges" + core "gopkg.in/src-d/core-retrieval.v0" +) + +const ( + updaterCmdName = "updater" + updaterCmdShortDesc = "creates new jobs to update fetched repos" + updaterCmdLongDesc = "" +) + +type updaterCmd struct { + cmd + Time float32 `long:"time" default:"5" description:"time in minutes between each update, it can be fractional (0.5 = 30 seconds)"` + MaxJobs uint `long:"max" default:"200" description:"maximum number of jobs sent per update"` + OldJobs float32 `long:"old" default:"30" description:"time in days when a repository update is considered old"` +} + +func (c *updaterCmd) Execute(args []string) error { + c.init() + + b := core.Broker() + defer b.Close() + q, err := b.Queue(c.Queue) + if err != nil { + + } + + t := time.Duration(c.Time * float32(time.Minute)) + o := time.Duration(c.OldJobs * float32(time.Hour*24)) + + db := core.Database() + updater := borges.NewUpdater(log, db, q, t, c.MaxJobs, o) + updater.Start() + + return nil +} diff --git a/producer.go b/producer.go index 02f51476..756cb547 100644 --- a/producer.go +++ b/producer.go @@ -18,19 +18,26 @@ type Producer struct { running bool startOnce *sync.Once stopOnce *sync.Once + priority queue.Priority // used by Stop() to wait until Start() has finished startIsRunning chan struct{} } // NewProducer creates a new producer. -func NewProducer(log log15.Logger, jobIter JobIter, queue queue.Queue) *Producer { +func NewProducer( + log log15.Logger, + jobIter JobIter, + queue queue.Queue, + prio queue.Priority, +) *Producer { return &Producer{ log: log.New("mode", "producer"), jobIter: jobIter, queue: queue, startOnce: &sync.Once{}, stopOnce: &sync.Once{}, + priority: prio, } } @@ -95,6 +102,8 @@ func (p *Producer) add(j *Job) error { return err } + qj.SetPriority(p.priority) + return p.queue.Publish(qj) } diff --git a/producer_test.go b/producer_test.go index 3c11ee8b..1de6598b 100644 --- a/producer_test.go +++ b/producer_test.go @@ -38,7 +38,8 @@ func (s *ProducerSuite) SetupSuite() { func (s *ProducerSuite) newProducer() *Producer { storer := storage.FromDatabase(s.DB) - return NewProducer(log15.New(), NewMentionJobIter(s.mentionsQueue, storer), s.queue) + return NewProducer(log15.New(), NewMentionJobIter(s.mentionsQueue, storer), + s.queue, queue.PriorityNormal) } func (s *ProducerSuite) newJob() *queue.Job { @@ -112,7 +113,7 @@ func (s *ProducerSuite) TestStartStop_TwoEqualsJobs() { } func (s *ProducerSuite) TestStartStop_ErrorNoNotifier() { - p := NewProducer(log15.New(), &DummyJobIter{}, s.queue) + p := NewProducer(log15.New(), &DummyJobIter{}, s.queue, queue.PriorityNormal) go p.Start() diff --git a/updater.go b/updater.go new file mode 100644 index 00000000..ff30f4f4 --- /dev/null +++ b/updater.go @@ -0,0 +1,207 @@ +package borges + +import ( + "database/sql" + "time" + + "github.com/inconshreveable/log15" + uuid "github.com/satori/go.uuid" + "github.com/src-d/borges/storage" + core "gopkg.in/src-d/core-retrieval.v0" + "gopkg.in/src-d/core-retrieval.v0/model" + "gopkg.in/src-d/framework.v0/queue" + kallax "gopkg.in/src-d/go-kallax.v1" +) + +const ( + queryByUpdate = `select id from repositories where status='fetched' and + updated_at < $1 order by updated_at limit $2` + queryByLastCommit = `select id from repositories where status='fetched' + order by age(updated_at, last_commit_at) limit $1` +) + +type updaterSearchFunction struct { + name string + function func(int) ([]kallax.ULID, error) + priority queue.Priority +} + +// Updater finds fetched repositories to be updated +type Updater struct { + log log15.Logger + + db *sql.DB + storage storage.RepoStore + queue queue.Queue + + // Time between update actions + cadence time.Duration + + // Maximum number of jobs to submit per update + maxJobs uint + + // Age when we consider a repository update to be old + oldJobs time.Duration +} + +// NewUpdater creates a new Updater +func NewUpdater( + log log15.Logger, + db *sql.DB, + queue queue.Queue, + cadence time.Duration, + maxJobs uint, + oldJobs time.Duration, +) *Updater { + return &Updater{ + log: log, + db: db, + storage: storage.FromDatabase(core.Database()), + queue: queue, + cadence: cadence, + maxJobs: maxJobs, + oldJobs: oldJobs, + } +} + +// Start initializes the update process +func (u *Updater) Start() { + ticker := time.NewTicker(u.cadence) + + for t := range ticker.C { + u.execute(t) + } +} + +func (u *Updater) searchFunctions() []updaterSearchFunction { + return []updaterSearchFunction{ + {"old repositories", u.reposOld, queue.PriorityUrgent}, + {"almost old repositories", u.reposAlmostOld, queue.PriorityNormal}, + {"best effort", u.reposBestEffort, queue.PriorityLow}, + } +} + +func (u *Updater) execute(t time.Time) { + executeStart := time.Now() + jobsLeft := int(u.maxJobs) + + for _, f := range u.searchFunctions() { + log := u.log.New("function", f.name) + + log.Debug("search start", "priority", f.priority) + start := time.Now() + + num, err := u.executeFunction(log, f, jobsLeft) + if err != nil { + continue + } + + log.Debug("search end", "duration", time.Since(start), + "jobs", num) + + jobsLeft -= num + + if jobsLeft <= 0 { + log.Debug("maximum number of jobs") + return + } + } + + u.log.Info("Update finished", "jobs", int(u.maxJobs)-jobsLeft, + "duration", time.Since(executeStart)) +} + +func (u *Updater) executeFunction( + log log15.Logger, + f updaterSearchFunction, + limit int, +) (int, error) { + log = log.New("function", f.name) + + res, err := f.function(limit) + if err != nil { + log.Error("Error executing search function", "query", err) + return 0, err + } + + for _, s := range res { + log = log.New("uuid", s) + + id, err := uuid.FromString(s.String()) + if err != nil { + log.Error("Could not parse uuid", "error", err) + continue + } + + job, err := queue.NewJob() + if err != nil { + log.Error("Could not create job", "error", err) + continue + } + + payload := Job{RepositoryID: id} + if err := job.Encode(payload); err != nil { + log.Error("Could not encode job", "error", err) + continue + } + + job.SetPriority(f.priority) + + repo, err := u.storage.Get(s) + if err != nil { + log.Error("Could not get repository from database", "error", err) + continue + } + + err = u.storage.SetStatus(repo, model.Pending) + if err != nil { + log.Error("Could not change repository status in the database", + "error", err) + continue + } + + if err := u.queue.Publish(job); err != nil { + log.Error("Could not submit job", "error", err) + continue + } + } + + return len(res), nil +} + +func (u *Updater) reposBestEffort(limit int) ([]kallax.ULID, error) { + return u.transformResult(u.db.Query(queryByLastCommit, limit)) +} + +func (u *Updater) reposOld(limit int) ([]kallax.ULID, error) { + date := time.Now().Add(-1 * u.oldJobs) + return u.transformResult(u.db.Query(queryByUpdate, date, limit)) +} + +func (u *Updater) reposAlmostOld(limit int) ([]kallax.ULID, error) { + date := time.Now().Add(-1 * u.oldJobs / 2) + return u.transformResult(u.db.Query(queryByUpdate, date, limit)) +} + +func (u *Updater) transformResult( + rows *sql.Rows, + err error, +) ([]kallax.ULID, error) { + if err != nil { + return nil, err + } + + result := make([]kallax.ULID, 0, u.maxJobs) + for rows.Next() { + var r kallax.ULID + err := rows.Scan(&r) + if err != nil { + u.log.Error("Error retrieving row", err) + continue + } + + result = append(result, r) + } + + return result, rows.Err() +} diff --git a/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go b/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go index a2937669..6c17ed1c 100644 --- a/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go +++ b/vendor/gopkg.in/src-d/framework.v0/lock/etcd_linux_test.go @@ -1,3 +1,5 @@ +// +build linux + package lock import ( @@ -5,8 +7,6 @@ import ( "time" ) -// +build linux - func (s *EtcdLockSuite) TestLockExpire() { assert := s.Assert() diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go b/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go index 289ead55..782edece 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/amqp.go @@ -169,6 +169,7 @@ func (b *AMQPBroker) Queue(name string) (Queue, error) { amqp.Table{ "x-dead-letter-exchange": rex, "x-dead-letter-routing-key": name, + "x-max-priority": uint8(PriorityUrgent), }, ) @@ -246,6 +247,7 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error { "x-dead-letter-routing-key": q.queue.Name, "x-message-ttl": int64(ttl), "x-expires": int64(ttl) * 2, + "x-max-priority": uint8(PriorityUrgent), }, ) if err != nil { diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go b/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go index 3924da28..bb89153e 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/amqp_test.go @@ -26,3 +26,58 @@ func TestNewAMQPBroker_bad_url(t *testing.T) { assert.Error(err) assert.Nil(b) } + +func sendJobs(assert *assert.Assertions, n int, p Priority, q Queue) { + for i := 0; i < n; i++ { + j, err := NewJob() + assert.NoError(err) + j.SetPriority(p) + err = j.Encode(i) + assert.NoError(err) + err = q.Publish(j) + assert.NoError(err) + } +} + +func TestAMQPPriorities(t *testing.T) { + assert := assert.New(t) + + broker, err := NewAMQPBroker(testAMQPURI) + assert.NoError(err) + assert.NotNil(broker) + + name := newName() + q, err := broker.Queue(name) + assert.NoError(err) + assert.NotNil(q) + + // Send 50 low priority jobs + sendJobs(assert, 50, PriorityLow, q) + + // Send 50 high priority jobs + sendJobs(assert, 50, PriorityUrgent, q) + + // Receive and collect priorities + iter, err := q.Consume(1) + assert.NoError(err) + assert.NotNil(iter) + + sumFirst := uint(0) + sumLast := uint(0) + + for i := 0; i < 100; i++ { + j, err := iter.Next() + assert.NoError(err) + assert.NoError(j.Ack()) + + if i < 50 { + sumFirst += uint(j.Priority) + } else { + sumLast += uint(j.Priority) + } + } + + assert.True(sumFirst > sumLast) + assert.Equal(uint(PriorityUrgent)*50, sumFirst) + assert.Equal(uint(PriorityLow)*50, sumLast) +} diff --git a/vendor/gopkg.in/src-d/framework.v0/queue/job.go b/vendor/gopkg.in/src-d/framework.v0/queue/job.go index 2ae6a2a8..be967e19 100644 --- a/vendor/gopkg.in/src-d/framework.v0/queue/job.go +++ b/vendor/gopkg.in/src-d/framework.v0/queue/job.go @@ -56,6 +56,11 @@ func NewJob() (*Job, error) { }, nil } +// SetPriority sets job priority +func (j *Job) SetPriority(priority Priority) { + j.Priority = priority +} + // Encode encodes the payload to the wire format used. func (j *Job) Encode(payload interface{}) error { var err error