git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Difference between BucketingSink and StreamingFileSink


Hi,

Sorry for wasting your time. I found the solution for that question
regarding event-time: a class that extends BucketAssigner would do the
needful:

class SdcTimeBucketAssigner[T <: MyClass](prefix: String, formatString:
String) extends BucketAssigner[T, String]{
	@transient
	var dateFormatter = new SimpleDateFormat(formatString)

	override def getBucketId(in: T, context: BucketAssigner.Context): String =
{
		if (dateFormatter == null) dateFormatter = new
SimpleDateFormat(formatString)
		s"$prefix${dateFormatter.format(new java.util.Date(in.getTimestamp))}"
	}

	override def getSerializer = SimpleVersionedStringSerializer.INSTANCE
}

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/