Skip to content

spout_aggregation_changes#1

Open
NPraneeth wants to merge 3 commits intodevelopfrom
add-crushdown-spout
Open

spout_aggregation_changes#1
NPraneeth wants to merge 3 commits intodevelopfrom
add-crushdown-spout

Conversation

@NPraneeth
Copy link
Owner

The PR takes care of adding the aggregation capability in the spout. When FMMergeableWithSource is opted in and followed by the summer, then the map-side aggregation happens in the spout ( localized ).

}

private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
private def scheduleSpout[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to K and V, where do we specify them? I don't think they can be inferred from arguments (so they'd default to Nothing). Didn't we want to leave this untyped?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make the default to Any, Any.
Or we can specify [Any,Any] when we call it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say let's push the Any down a bit further. The most conspicuous thing about putting the type parameters here is that spouts are not necessarily key-value.

outputCollector: SpoutOutputCollector): Unit = {
val adapterCollector = new TransformingOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]])
self.open(conf, topologyContext, adapterCollector)
adapterCollector = new AggregatorOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]], summerBuilder, summerShards)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make two OutputCollector classes. The flow of tuples will look like this:
spout: List((k, v)) ->
aggregatingOutputCollector: List(CMap) ->
transformingOutputCollector: List(Int, CMap)

This is a wrapper method to call the emit with appropriate signature
based on the arguments.
*/
(messageIds.isEmpty, stream.isEmpty) match {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This special handling of empty stream name seems odd to me. What does empty stream name signify? Can we use Option[String] instead of relying on special value of stream name?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants