git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to test window


Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer,

so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet)

thank you all!

 import org.apache.flink.annotation.Public;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class IntergrationTest extends AbstractTestBase {



@Test
public void test() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);

// values are collected in a static variable
CollectSink.values.clear();
LinkedList<CameraEvent> events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());

env.execute("lala");
assertEquals(events.get(1), CollectSink.values.get(0));
}

private static LinkedList<CameraEvent> GenerateEvents() {
LinkedList<CameraEvent> linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {

cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR");

linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction<CameraEvent> {

// must be static
public static final List<CameraEvent> values = new ArrayList<>();

@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}


}