git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-10478) Kafka Producer wrongly formats "%" for transaction ID


Obi Tetsuya created FLINK-10478:
-----------------------------------

             Summary: Kafka Producer wrongly formats "%" for transaction ID
                 Key: FLINK-10478
                 URL: https://issues.apache.org/jira/browse/FLINK-10478
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.4.2
         Environment: Flink 1.4.2

Scala 2.11.12

jdk1.8.0_162

Running on local embedded Flink mini cluster (This happened on a standalone cluster with another code)
            Reporter: Obi Tetsuya


Kafka Producer with exactly-once feature uses its task name for a transaction ID. Because the Producer uses the name as a format string directly, in the case it contains "%" the job fails.

Code to reproduce:
{code:scala}
object ExampleRunner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.enableCheckpointing(1000)
    env.getConfig.disableSysoutLogging()
    env.setRestartStrategy(RestartStrategies.noRestart)

    val p = new java.util.Properties
    Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) }

    env
      .fromCollection(100 to 200)
      .map(_.toString)
      .addSink(new FlinkKafkaProducer011(
        "test",
        new KeyedSerializationSchemaWrapper(new SimpleStringSchema),
        p,
        Semantic.EXACTLY_ONCE)).name("100%")
    env.execute()
  }
}
{code}

Raised exception:
{code}
2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO  o.a.flink.runtime.taskmanager.Task  - Map -> Sink: 100% (1/8) (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED.
java.util.MissingFormatWidthException: %-%
	at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040)
	at java.util.Formatter$FormatSpecifier.<init>(Formatter.java:2733)
	at java.util.Formatter.parse(Formatter.java:2560)
	at java.util.Formatter.format(Formatter.java:2501)
	at java.util.Formatter.format(Formatter.java:2455)
	at java.lang.String.format(String.java:2940)
	at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91)
	at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72)
	at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)