Conversation
| } | ||
|
|
||
| private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { | ||
| private def scheduleSpout[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { |
There was a problem hiding this comment.
Coming back to K and V, where do we specify them? I don't think they can be inferred from arguments (so they'd default to Nothing). Didn't we want to leave this untyped?
There was a problem hiding this comment.
We can make the default to Any, Any.
Or we can specify [Any,Any] when we call it.
There was a problem hiding this comment.
I would say let's push the Any down a bit further. The most conspicuous thing about putting the type parameters here is that spouts are not necessarily key-value.
| outputCollector: SpoutOutputCollector): Unit = { | ||
| val adapterCollector = new TransformingOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]]) | ||
| self.open(conf, topologyContext, adapterCollector) | ||
| adapterCollector = new AggregatorOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]], summerBuilder, summerShards) |
There was a problem hiding this comment.
Let's make two OutputCollector classes. The flow of tuples will look like this:
spout: List((k, v)) ->
aggregatingOutputCollector: List(CMap) ->
transformingOutputCollector: List(Int, CMap)
| This is a wrapper method to call the emit with appropriate signature | ||
| based on the arguments. | ||
| */ | ||
| (messageIds.isEmpty, stream.isEmpty) match { |
There was a problem hiding this comment.
This special handling of empty stream name seems odd to me. What does empty stream name signify? Can we use Option[String] instead of relying on special value of stream name?
The PR takes care of adding the aggregation capability in the spout. When FMMergeableWithSource is opted in and followed by the summer, then the map-side aggregation happens in the spout ( localized ).