Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ coverage.txt
# Built binaries
bin/
build/
borges
borges.exe

2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ http://github.com/c/repo3
http://github.com/d/repo4.git
```

You can change the priority of jobs produced with `--priority` option. It is a number from 0 to 8 where 0 is the lowest priority:

borges producer --source=file --file /path/to/file --priority 8

When jobs fail they're sent to the buried queue. If you want to requeue them, you can pass the `--republish-buried` flag (this only works for the `mentions` source). For example:

```
Expand Down
2 changes: 2 additions & 0 deletions changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func addToChangesDfferenceBetweenOldAndNewRefs(
CreatedAt: createdAt,
UpdatedAt: now,
}
newReference.Time = newRef.Time
c.Add(newReference)

return nil
Expand All @@ -148,6 +149,7 @@ func addToChangesDfferenceBetweenOldAndNewRefs(
CreatedAt: oldRef.CreatedAt,
UpdatedAt: now,
}
updateReference.Time = newRef.Time
c.Update(oldRef, updateReference)
}

Expand Down
17 changes: 13 additions & 4 deletions cli/borges/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,25 @@ func main() {
panic(err)
}

if _, err := parser.AddCommand(producerCmdName, producerCmdShortDesc,
producerCmdLongDesc, &producerCmd{}); err != nil {
c, err := parser.AddCommand(producerCmdName, producerCmdShortDesc,
producerCmdLongDesc, &producerCmd{})
if err != nil {
panic(err)
}

setPrioritySettings(c)

if _, err := parser.AddCommand(initCmdName, initCmdShortDesc,
initCmdLongDesc, new(initCmd)); err != nil {
panic(err)
}

if _, err := parser.AddCommand(initCmdName, initCmdShortDesc, initCmdLongDesc, new(initCmd)); err != nil {
if _, err := parser.AddCommand(packerCmdName, packerCmdShortDesc,
packerCmdLongDesc, new(packerCmd)); err != nil {
panic(err)
}

if _, err := parser.AddCommand(packerCmdName, packerCmdShortDesc, packerCmdLongDesc, new(packerCmd)); err != nil {
if _, err := parser.AddCommand(updaterCmdName, updaterCmdShortDesc, updaterCmdLongDesc, new(updaterCmd)); err != nil {
panic(err)
}

Expand Down
33 changes: 32 additions & 1 deletion cli/borges/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package main
import (
"fmt"
"os"
"strconv"
"strings"

flags "github.com/jessevdk/go-flags"
"github.com/src-d/borges"
"github.com/src-d/borges/storage"

Expand All @@ -24,11 +27,39 @@ type producerCmd struct {
MentionsQueue string `long:"mentionsqueue" default:"rovers" description:"queue name used to obtain mentions if the source type is 'mentions'"`
File string `long:"file" description:"path to a file to read URLs from, used with --source=file"`
RepublishBuried bool `long:"republish-buried" description:"republishes again all buried jobs before starting to listen for mentions, used with --source=mentions"`
Priority uint8 `long:"priority" default:"4" description:"priority used to enqueue jobs, goes from 0 (lowest) to :MAX: (highest)"`
}

// Changes the priority description and default on runtime as it is not
// possible to create a dynamic tag
func setPrioritySettings(c *flags.Command) {
options := c.Options()

for _, o := range options {
if o.LongName == "priority" {
o.Default[0] = strconv.Itoa((int(queue.PriorityNormal)))
o.Description = strings.Replace(
o.Description, ":MAX:", strconv.Itoa(int(queue.PriorityUrgent)), 1)
}
}
}

func checkPriority(prio uint8) error {
if prio > uint8(queue.PriorityUrgent) {
return fmt.Errorf("Priority must be between 0 and %d", queue.PriorityUrgent)
}

return nil
}

func (c *producerCmd) Execute(args []string) error {
c.init()

err := checkPriority(c.Priority)
if err != nil {
return err
}

b := core.Broker()
defer b.Close()
q, err := b.Queue(c.Queue)
Expand All @@ -42,7 +73,7 @@ func (c *producerCmd) Execute(args []string) error {
}
defer ioutil.CheckClose(ji, &err)

p := borges.NewProducer(log, ji, q)
p := borges.NewProducer(log, ji, q, queue.Priority(c.Priority))
p.Start()

return err
Expand Down
41 changes: 41 additions & 0 deletions cli/borges/updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"time"

"github.com/src-d/borges"
core "gopkg.in/src-d/core-retrieval.v0"
)

const (
updaterCmdName = "updater"
updaterCmdShortDesc = "creates new jobs to update fetched repos"
updaterCmdLongDesc = ""
)

type updaterCmd struct {
cmd
Time float32 `long:"time" default:"5" description:"time in minutes between each update, it can be fractional (0.5 = 30 seconds)"`
MaxJobs uint `long:"max" default:"200" description:"maximum number of jobs sent per update"`
OldJobs float32 `long:"old" default:"30" description:"time in days when a repository update is considered old"`
}

func (c *updaterCmd) Execute(args []string) error {
c.init()

b := core.Broker()
defer b.Close()
q, err := b.Queue(c.Queue)
if err != nil {

}

t := time.Duration(c.Time * float32(time.Minute))
o := time.Duration(c.OldJobs * float32(time.Hour*24))

db := core.Database()
updater := borges.NewUpdater(log, db, q, t, c.MaxJobs, o)
updater.Start()

return nil
}
11 changes: 10 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ type Producer struct {
running bool
startOnce *sync.Once
stopOnce *sync.Once
priority queue.Priority

// used by Stop() to wait until Start() has finished
startIsRunning chan struct{}
}

// NewProducer creates a new producer.
func NewProducer(log log15.Logger, jobIter JobIter, queue queue.Queue) *Producer {
func NewProducer(
log log15.Logger,
jobIter JobIter,
queue queue.Queue,
prio queue.Priority,
) *Producer {
return &Producer{
log: log.New("mode", "producer"),
jobIter: jobIter,
queue: queue,
startOnce: &sync.Once{},
stopOnce: &sync.Once{},
priority: prio,
}
}

Expand Down Expand Up @@ -95,6 +102,8 @@ func (p *Producer) add(j *Job) error {
return err
}

qj.SetPriority(p.priority)

return p.queue.Publish(qj)
}

Expand Down
5 changes: 3 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func (s *ProducerSuite) SetupSuite() {

func (s *ProducerSuite) newProducer() *Producer {
storer := storage.FromDatabase(s.DB)
return NewProducer(log15.New(), NewMentionJobIter(s.mentionsQueue, storer), s.queue)
return NewProducer(log15.New(), NewMentionJobIter(s.mentionsQueue, storer),
s.queue, queue.PriorityNormal)
}

func (s *ProducerSuite) newJob() *queue.Job {
Expand Down Expand Up @@ -112,7 +113,7 @@ func (s *ProducerSuite) TestStartStop_TwoEqualsJobs() {
}

func (s *ProducerSuite) TestStartStop_ErrorNoNotifier() {
p := NewProducer(log15.New(), &DummyJobIter{}, s.queue)
p := NewProducer(log15.New(), &DummyJobIter{}, s.queue, queue.PriorityNormal)

go p.Start()

Expand Down
Loading