From fba49a6eeca2397197da6007649f9f6b53880d50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=B5=9C=EC=9C=A0=EC=84=B1?= Date: Fri, 4 Dec 2020 23:58:31 +0900 Subject: [PATCH 1/6] =?UTF-8?q?=EB=A0=88=EB=94=94=EC=8A=A4=20uri=20?= =?UTF-8?q?=EB=B3=84=20=EC=97=B0=EA=B2=B0=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lettuce/src/test/java/RedisClientUriTest.java | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 lettuce/src/test/java/RedisClientUriTest.java diff --git a/lettuce/src/test/java/RedisClientUriTest.java b/lettuce/src/test/java/RedisClientUriTest.java new file mode 100644 index 0000000..81bb8e3 --- /dev/null +++ b/lettuce/src/test/java/RedisClientUriTest.java @@ -0,0 +1,91 @@ +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class RedisClientUriTest { + + private RedisClient client; + private StatefulRedisConnection connection; + + @AfterEach + void tearDown() { + if (connection != null) { + connection.sync().flushall(); + connection.close(); + } + if (client != null) { + client.shutdown(); + } + this.connection = null; + this.client = null; + } + + @Test + @DisplayName("redisUri를 통해 레디스 연결이 가능하다.") + void name() { + //given + RedisURI redisUri = RedisURI.Builder.redis("localhost") + .withPassword("root") // 패스워드 + .withDatabase(2) // db 저장 + .build(); + + + client = RedisClient.create(redisUri); + connection = client.connect(); + + String uuid = UUID.randomUUID().toString(); + + //when + RedisCommands commands = connection.sync(); + commands.set("foo", uuid); + String value = commands.get("foo"); + + //then + assertThat(value).isEqualTo(uuid); + } + + @Test + @DisplayName("문자열 형태로 레디스 연결") + void name2() { + //given + RedisURI redisUri = RedisURI.create("redis://root@localhost/2"); + client = RedisClient.create(redisUri); + + StatefulRedisConnection connection = client.connect(); + + String uuid = UUID.randomUUID().toString(); + + //when + RedisCommands commands = connection.sync(); + commands.set("foo", uuid); + String value = commands.get("foo"); + + //then + assertThat(value).isEqualTo(uuid); + } + + @Test + @DisplayName("잘못된 비밀번호는 연결 실패한다.") + void name3() { + //given + RedisURI redisUri = RedisURI.create("redis://root-1@localhost/2"); + client = RedisClient.create(redisUri); + + //when + //then + assertThatThrownBy(client::connect) + .isInstanceOf(RedisException.class); + + } + +} From efd9fb39a86eb12064f7ec71e06078069939831f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=84=8E=E1=85=AC=E1=84=8B=E1=85=B2=E1=84=89=E1=85=A5?= =?UTF-8?q?=E1=86=BC?= Date: Sat, 19 Dec 2020 20:59:41 +0900 Subject: [PATCH 2/6] =?UTF-8?q?=EB=A0=88=EB=94=94=EC=8A=A4=20async=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 25 ++- gradle/wrapper/gradle-wrapper.properties | 2 +- .../src/test/java/CompletableFutureTest.java | 73 ++++++++ lettuce/src/test/java/RedisAsyncTest.java | 174 ++++++++++++++++++ settings.gradle | 1 + spring-data-redis/build.gradle | 3 + .../src/main/java/com/cys/Point.java | 25 +++ .../main/java/com/cys/PointRepository.java | 6 + .../src/main/java/com/cys/RedisConfig.java | 37 ++++ .../src/test/java/com/cys/PointTest.java | 25 +++ 10 files changed, 367 insertions(+), 4 deletions(-) create mode 100644 lettuce/src/test/java/CompletableFutureTest.java create mode 100644 lettuce/src/test/java/RedisAsyncTest.java create mode 100644 spring-data-redis/build.gradle create mode 100644 spring-data-redis/src/main/java/com/cys/Point.java create mode 100644 spring-data-redis/src/main/java/com/cys/PointRepository.java create mode 100644 spring-data-redis/src/main/java/com/cys/RedisConfig.java create mode 100644 spring-data-redis/src/test/java/com/cys/PointTest.java diff --git a/build.gradle b/build.gradle index be32ac1..333b34b 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,11 @@ +plugins { + id 'org.springframework.boot' version '2.3.4.RELEASE' + id 'io.spring.dependency-management' version '1.0.10.RELEASE' +} + + // 자바 모듈 -def javaProjects = [project(":lettuce")] +def javaProjects = [project(":lettuce"),project(":spring-data-redis")] configure(javaProjects) { apply plugin: "java" @@ -13,8 +19,9 @@ configure(javaProjects) { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.3.1' - testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.3.1' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.2' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.6.2' + testCompile group: 'org.junit.jupiter', name: 'junit-jupiter-params', version: '5.6.2' testCompile group: 'org.assertj', name: 'assertj-core', version: '3.6.1' } @@ -23,3 +30,15 @@ configure(javaProjects) { } } + +def springProjects = [project(":spring-data-redis")] + +configure(springProjects) { + apply plugin: "org.springframework.boot" + apply plugin: "io.spring.dependency-management" + + + dependencies { + testCompile("org.springframework.boot:spring-boot-starter-test") + } +} diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 59f599e..0cd1f91 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ #Sat Nov 28 14:19:24 KST 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-6.1.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/lettuce/src/test/java/CompletableFutureTest.java b/lettuce/src/test/java/CompletableFutureTest.java new file mode 100644 index 0000000..c353076 --- /dev/null +++ b/lettuce/src/test/java/CompletableFutureTest.java @@ -0,0 +1,73 @@ +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +class CompletableFutureTest { + + @Test + @DisplayName("thenRun은 complete호출 되면 수행된다.") + void name() { + final CompletableFuture future = new CompletableFuture<>(); + + AtomicReference validText = new AtomicReference<>("one"); + future.thenRun(() -> validText.set("two")); + + assertThat(future.isDone()).isFalse(); + assertThat(validText.get()).isEqualTo("one"); + + future.complete("my value"); + assertThat(future.isDone()).isTrue(); + assertThat(validText.get()).isEqualTo("two"); + } + + @Test + @DisplayName("thenAccept complete으로 주입받는 value를 소비한다") + void name2() { + CompletableFuture future = new CompletableFuture<>(); + + AtomicReference validText = new AtomicReference<>("one"); + future.thenAccept(validText::set); + + assertThat(future.isDone()).isFalse(); + assertThat(validText.get()).isEqualTo("one"); + + future.complete("two"); + + assertThat(future.isDone()).isTrue(); + assertThat(validText.get()).isEqualTo("two"); + } + + @ParameterizedTest + @CsvSource({"300,400,one", "400,300,two"}) + void name3(int sleep1, int sleep2, String resultText) { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + sleep(sleep1); + return "one"; + }); + + CompletableFuture otherFuture = CompletableFuture.supplyAsync(() -> { + sleep(sleep2); + return "two"; + }); + + AtomicReference validText = new AtomicReference<>(""); + future.acceptEither(otherFuture, validText::set).join(); + + assertThat(validText.get()).isEqualTo(resultText); + } + + private void sleep(int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + e.getStackTrace(); + } + } + +} diff --git a/lettuce/src/test/java/RedisAsyncTest.java b/lettuce/src/test/java/RedisAsyncTest.java new file mode 100644 index 0000000..6a05761 --- /dev/null +++ b/lettuce/src/test/java/RedisAsyncTest.java @@ -0,0 +1,174 @@ +import io.lettuce.core.LettuceFutures; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class RedisAsyncTest { + private static final RedisClient CLIENT = RedisClient.create("redis://localhost"); + private static final StatefulRedisConnection CONNECTION = CLIENT.connect(); + private final RedisAsyncCommands commands = CONNECTION.async(); + + private final String key = "key"; + private final String value = "value"; + + @AfterAll + static void afterAll() { + CONNECTION.close(); + CLIENT.shutdown(); + } + + @BeforeEach + void setUp() { + CONNECTION.async().set(key, value); + + } + + @AfterEach + void tearDown() { + commands.flushall(); + } + + @Test + @DisplayName("complet이 호출될때까지 결과를 가져올 수 없다") + void name() { + RedisFuture future = commands.get(key); + future.thenAcceptAsync(value -> { + System.out.println(Thread.currentThread().getName()); + }); + + future.exceptionally(throwable -> { + if (throwable instanceof IllegalStateException) { + return "default value"; + } + + return "other default value"; + }); + } + + @Test + @DisplayName("thenAccept test") + void name2() throws InterruptedException { + //given + RedisFuture future = commands.get("key"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference validText = new AtomicReference<>("one"); + AtomicReference threadName = new AtomicReference<>("thread"); + + //when + future.thenAccept(value -> { + validText.set(value); + threadName.set(Thread.currentThread().getName()); + latch.countDown(); + }); + latch.await(); + + //then + assertThat(validText.get()).isEqualTo(value); +// assertThat(threadName.get()).contains("lettuce-nioEventLoop-"); + } + + @Test + @DisplayName("thenAcceptAsync test") + void name3() throws InterruptedException { + //given + RedisFuture future = commands.get("key"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference validText = new AtomicReference<>("one"); + AtomicReference threadName = new AtomicReference<>("thread"); + + //when + future.thenAcceptAsync(value -> { + validText.set(value); + threadName.set(Thread.currentThread().getName()); + latch.countDown(); + }); + latch.await(); + + //then + assertThat(validText.get()).isEqualTo(value); + assertThat(threadName.get()).contains("ForkJoinPool.commonPool-worker-"); + } + + @CsvSource({"MINUTES,true", "MILLISECONDS,false"}) + @ParameterizedTest + void name4(TimeUnit timeUnit, boolean result) { + //given + List> futures = new ArrayList<>(); + + //when + for (int i = 0; i < 10; i++) { + futures.add(commands.set("key-" + i, "value-" + i)); + } + + //then + assertThat(LettuceFutures.awaitAll(1, timeUnit, futures.toArray(new RedisFuture[0]))).isEqualTo(result); + + } + + @Test + void name5() throws InterruptedException { + RedisFuture future = commands.get("key"); + AtomicInteger value = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); + + future.thenApply(String::length) + .thenAccept(integer -> { + value.set(integer); + latch.countDown(); + }); + + latch.await(); + assertThat(value.get()).isEqualTo(5); + } + + @Test + void name6() { + assertThatThrownBy(() -> { + RedisFuture future = commands.get(key); + future.get(1, TimeUnit.MICROSECONDS); + }).isInstanceOf(TimeoutException.class); + } + + @Test + void name7() throws InterruptedException { + RedisFuture future = commands.get(key); + assertThat(future.await(1, TimeUnit.MICROSECONDS)).isFalse(); + } + + @Test + void name8() throws InterruptedException { + //given + AtomicReference result = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + RedisFuture future = commands.get(key); + + future.handle((value, throwable) -> { + if (throwable != null) { + return "fail"; + } + return "success"; + }) + .thenAccept(value -> { + result.set(value); + latch.countDown(); + }); + latch.await(); + + assertThat(result.get()).isEqualTo("success"); + } +} diff --git a/settings.gradle b/settings.gradle index 761acab..784a6ed 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,4 @@ rootProject.name = 'learning-test-redis' include 'lettuce' +include 'spring-data-redis' diff --git a/spring-data-redis/build.gradle b/spring-data-redis/build.gradle new file mode 100644 index 0000000..b19e5ae --- /dev/null +++ b/spring-data-redis/build.gradle @@ -0,0 +1,3 @@ +dependencies { + compile("org.springframework.boot:spring-boot-starter-data-redis") +} diff --git a/spring-data-redis/src/main/java/com/cys/Point.java b/spring-data-redis/src/main/java/com/cys/Point.java new file mode 100644 index 0000000..02ef706 --- /dev/null +++ b/spring-data-redis/src/main/java/com/cys/Point.java @@ -0,0 +1,25 @@ +package com.cys; + +import org.springframework.data.annotation.Id; +import org.springframework.data.redis.core.RedisHash; + +@RedisHash(value = "testPoint", timeToLive = 60L) +public class Point { + + @Id + private String id; // userId + private Long point; + + public Point(String id, Long point) { + this.id = id; + this.point = point; + } + + public String getId() { + return id; + } + + public Long getPoint() { + return point; + } +} diff --git a/spring-data-redis/src/main/java/com/cys/PointRepository.java b/spring-data-redis/src/main/java/com/cys/PointRepository.java new file mode 100644 index 0000000..461ee71 --- /dev/null +++ b/spring-data-redis/src/main/java/com/cys/PointRepository.java @@ -0,0 +1,6 @@ +package com.cys; + +import org.springframework.data.repository.CrudRepository; + +public interface PointRepository extends CrudRepository { +} diff --git a/spring-data-redis/src/main/java/com/cys/RedisConfig.java b/spring-data-redis/src/main/java/com/cys/RedisConfig.java new file mode 100644 index 0000000..2ec3164 --- /dev/null +++ b/spring-data-redis/src/main/java/com/cys/RedisConfig.java @@ -0,0 +1,37 @@ +package com.cys; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.SocketOptions; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.repository.configuration.EnableRedisRepositories; + +import java.time.Duration; + +@Configuration +@EnableRedisRepositories +public class RedisConfig { + + @Bean + public RedisConnectionFactory redisConnectionFactory() { + RedisStandaloneConfiguration serverConfig = new RedisStandaloneConfiguration("localhost", 6379); + SocketOptions socketOptions = SocketOptions.builder().connectTimeout(Duration.ofSeconds(3)).build(); + ClientOptions clientOptions = ClientOptions.builder().socketOptions(socketOptions).build(); + LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder() + .commandTimeout(Duration.ofSeconds(5)) + .clientOptions(clientOptions).build(); + return new LettuceConnectionFactory(serverConfig, clientConfig); + } + + @Bean + public RedisTemplate redisTemplate() { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(redisConnectionFactory()); + return redisTemplate; + } +} diff --git a/spring-data-redis/src/test/java/com/cys/PointTest.java b/spring-data-redis/src/test/java/com/cys/PointTest.java new file mode 100644 index 0000000..c4e64de --- /dev/null +++ b/spring-data-redis/src/test/java/com/cys/PointTest.java @@ -0,0 +1,25 @@ +package com.cys; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest(classes = RedisConfig.class) +class PointTest { + + @Autowired + private PointRepository pointRepository; + + @Test + void name() { + //given + + //when + pointRepository.save(new Point("test1", 100L)); + pointRepository.save(new Point("test2", 100L)); +// pointRepository.findById("test1"); +// pointRepository.deleteById("test1"); + //then + + } +} \ No newline at end of file From 3f5e35c5c645ad8eba189fda86801377c31381d3 Mon Sep 17 00:00:00 2001 From: choiyusung Date: Sun, 3 Jan 2021 05:29:04 +0900 Subject: [PATCH 3/6] =?UTF-8?q?=EB=A6=AC=EC=95=A1=ED=8B=B0=EB=B8=8C=20?= =?UTF-8?q?=ED=94=84=EB=A1=9C=EA=B7=B8=EB=9E=98=EB=B0=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lettuce/src/main/java/ReactiveMain.java | 10 ++ lettuce/src/main/java/TempInfo.java | 28 +++++ lettuce/src/main/java/TempSubscriber.java | 29 +++++ lettuce/src/main/java/TempSubscription.java | 30 +++++ lettuce/src/test/java/ReactiveTest.java | 125 ++++++++++++++++++++ lettuce/src/test/java/RedisAsyncTest.java | 2 + 6 files changed, 224 insertions(+) create mode 100644 lettuce/src/main/java/ReactiveMain.java create mode 100644 lettuce/src/main/java/TempInfo.java create mode 100644 lettuce/src/main/java/TempSubscriber.java create mode 100644 lettuce/src/main/java/TempSubscription.java create mode 100644 lettuce/src/test/java/ReactiveTest.java diff --git a/lettuce/src/main/java/ReactiveMain.java b/lettuce/src/main/java/ReactiveMain.java new file mode 100644 index 0000000..b3b1163 --- /dev/null +++ b/lettuce/src/main/java/ReactiveMain.java @@ -0,0 +1,10 @@ +import org.reactivestreams.Publisher; + +public class ReactiveMain { + + public static void main(String[] args) { + Publisher publisher = sub -> sub.onSubscribe(new TempSubscription(sub, "seoul")); + publisher.subscribe(new TempSubscriber()); + } + +} diff --git a/lettuce/src/main/java/TempInfo.java b/lettuce/src/main/java/TempInfo.java new file mode 100644 index 0000000..c54f330 --- /dev/null +++ b/lettuce/src/main/java/TempInfo.java @@ -0,0 +1,28 @@ +import java.util.Random; + +public class TempInfo { + private static final Random RANDOM = new Random(); + + private final String town; + private final int temp; + + private TempInfo(String town, int temp) { + this.town = town; + this.temp = temp; + } + + static TempInfo fetch(String town) { + if (RANDOM.nextInt(10) == 0) { + throw new RuntimeException("error"); + } + return new TempInfo(town, RANDOM.nextInt(100)); + } + + @Override + public String toString() { + return "TempInfo{" + + "town='" + town + '\'' + + ", temp=" + temp + + '}'; + } +} diff --git a/lettuce/src/main/java/TempSubscriber.java b/lettuce/src/main/java/TempSubscriber.java new file mode 100644 index 0000000..bb84aa1 --- /dev/null +++ b/lettuce/src/main/java/TempSubscriber.java @@ -0,0 +1,29 @@ +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class TempSubscriber implements Subscriber { + + private Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + subscription.request(1); + } + + @Override + public void onNext(TempInfo tempInfo) { + System.out.println(tempInfo); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + System.out.println(t.getMessage()); + } + + @Override + public void onComplete() { + System.out.println("Done!"); + } +} diff --git a/lettuce/src/main/java/TempSubscription.java b/lettuce/src/main/java/TempSubscription.java new file mode 100644 index 0000000..fabb99a --- /dev/null +++ b/lettuce/src/main/java/TempSubscription.java @@ -0,0 +1,30 @@ +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class TempSubscription implements Subscription { + private final Subscriber subscriber; + private final String town; + + public TempSubscription(Subscriber subscriber, String town) { + this.subscriber = subscriber; + this.town = town; + } + + @Override + public void request(long n) { + for (long i = 0; i < n; i++) { + try { + TempInfo.fetch(town); + subscriber.onNext(TempInfo.fetch(town)); + } catch (Exception e) { + subscriber.onError(e); + break; + } + } + } + + @Override + public void cancel() { + subscriber.onComplete(); + } +} diff --git a/lettuce/src/test/java/ReactiveTest.java b/lettuce/src/test/java/ReactiveTest.java new file mode 100644 index 0000000..a387f7d --- /dev/null +++ b/lettuce/src/test/java/ReactiveTest.java @@ -0,0 +1,125 @@ +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static java.time.temporal.ChronoUnit.MILLIS; +import static org.assertj.core.api.Assertions.assertThat; + +class ReactiveTest { + + @Test + void name() { + //given + + Flux flux = Flux.just("Ben", "Michael", "Mark"); + + //when + flux.subscribe(new Subscriber() { + private Subscription subscription; + + public void onSubscribe(Subscription s) { + this.subscription = s; + this.subscription.request(1); + } + + public void onNext(String s) { + System.out.println("Hello " + s + "!"); + subscription.request(1); + } + + public void onError(Throwable t) { + System.out.println("Hello "); + } + + public void onComplete() { + System.out.println("Completed"); + } + }); + + //then + } + + + @CsvSource({"2,2,0", "4,3,1"}) + @ParameterizedTest + void name2(int take, int eventCount, int result) { + //given + Set events = new HashSet<>(); + Set results = new HashSet<>(); + + //when + Flux.just("Ben", "Michael", "Mark") // + .doOnNext(events::add) + .doOnComplete(() -> results.add("Completed")) + .take(take) + .subscribe(); + + //then + assertThat(events.size()).isEqualTo(eventCount); + assertThat(results.size()).isEqualTo(result); + } + + @Test + void name3() { + //given + //when + String last = Flux.just("Ben", "Michael", "Mark") + .last() + .block(); + System.out.println(last); + + //then + + } + + + @Test + void name4() { + //given + + //when + List list = Flux.just("Ben", "Michael", "Mark").collectList().block(); + System.out.println(list); + //then + + } + + @Test + void name5() throws InterruptedException { + //given +// EmitterProcessor data = EmitterProcessor.create(1); +// data.subscribe(t -> System.out.println(t)); +// FluxSink sink = data.sink(); +// sink.next(10L); +// sink.next(11L); +// sink.next(12L); +// data.subscribe(t -> System.out.println(t)); +// sink.next(13L); +// sink.next(14L); +// sink.next(15L); +// + EmitterProcessor emitter = EmitterProcessor.create(); + FluxSink sink = emitter.sink(); + emitter.publishOn(Schedulers.single()) + .map(String::toUpperCase) +// .filter(s -> s.startsWith("HELLO")) + .delayElements(Duration.of(1000, MILLIS)) + .subscribe(System.out::println); + + sink.next("Hello World!"); + sink.next("Goodbye World"); + sink.next("Again"); + Thread.sleep(3000); + } +} diff --git a/lettuce/src/test/java/RedisAsyncTest.java b/lettuce/src/test/java/RedisAsyncTest.java index 6a05761..af5f14e 100644 --- a/lettuce/src/test/java/RedisAsyncTest.java +++ b/lettuce/src/test/java/RedisAsyncTest.java @@ -3,6 +3,7 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.reactive.RedisReactiveCommands; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -21,6 +22,7 @@ public class RedisAsyncTest { private static final RedisClient CLIENT = RedisClient.create("redis://localhost"); private static final StatefulRedisConnection CONNECTION = CLIENT.connect(); + private final RedisReactiveCommands reactiveCommands = CONNECTION.reactive(); private final RedisAsyncCommands commands = CONNECTION.async(); private final String key = "key"; From 08006d0580214f636c673c6f7318d35b320a106a Mon Sep 17 00:00:00 2001 From: choiyusung Date: Sun, 24 Jan 2021 19:33:28 +0900 Subject: [PATCH 4/6] =?UTF-8?q?=EB=A6=AC=EC=95=A1=ED=8B=B0=EB=B8=8C=20?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=EB=A6=BC=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lettuce/src/main/java/TempSubscription.java | 1 - lettuce/src/test/java/ReactiveTest.java | 163 +++++++++----------- 2 files changed, 71 insertions(+), 93 deletions(-) diff --git a/lettuce/src/main/java/TempSubscription.java b/lettuce/src/main/java/TempSubscription.java index fabb99a..c8f3032 100644 --- a/lettuce/src/main/java/TempSubscription.java +++ b/lettuce/src/main/java/TempSubscription.java @@ -14,7 +14,6 @@ public TempSubscription(Subscriber subscriber, String town) { public void request(long n) { for (long i = 0; i < n; i++) { try { - TempInfo.fetch(town); subscriber.onNext(TempInfo.fetch(town)); } catch (Exception e) { subscriber.onError(e); diff --git a/lettuce/src/test/java/ReactiveTest.java b/lettuce/src/test/java/ReactiveTest.java index a387f7d..ec36551 100644 --- a/lettuce/src/test/java/ReactiveTest.java +++ b/lettuce/src/test/java/ReactiveTest.java @@ -1,125 +1,104 @@ +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.scheduler.Schedulers; -import java.time.Duration; -import java.util.HashSet; +import java.util.Arrays; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.function.Function; -import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.stream.Collectors.toMap; import static org.assertj.core.api.Assertions.assertThat; class ReactiveTest { + private final List testData = Arrays.asList("Ben", "Michael", "Mark"); + private ReactiveChecker checker; + + @BeforeEach + void setUp() { + checker = new ReactiveChecker(testData); + } @Test void name() { //given - - Flux flux = Flux.just("Ben", "Michael", "Mark"); - - //when - flux.subscribe(new Subscriber() { - private Subscription subscription; - - public void onSubscribe(Subscription s) { - this.subscription = s; - this.subscription.request(1); - } - - public void onNext(String s) { - System.out.println("Hello " + s + "!"); - subscription.request(1); - } - - public void onError(Throwable t) { - System.out.println("Hello "); - } - - public void onComplete() { - System.out.println("Completed"); - } - }); - - //then + Flux.just(testData.toArray(new String[0])) + .doOnNext(s -> checker.check(s)) + .doOnComplete(() -> assertThat(checker.allMatch()).isTrue()) + .subscribe(); } - @CsvSource({"2,2,0", "4,3,1"}) - @ParameterizedTest - void name2(int take, int eventCount, int result) { - //given - Set events = new HashSet<>(); - Set results = new HashSet<>(); - - //when - Flux.just("Ben", "Michael", "Mark") // - .doOnNext(events::add) - .doOnComplete(() -> results.add("Completed")) - .take(take) + @Test + void name2() { + Flux.just(testData.toArray(new String[0])) + .doOnNext(s -> checker.check(s)) + .doOnComplete(() -> assertThat(checker.allMatch()).isFalse()) + .take(2) .subscribe(); - //then - assertThat(events.size()).isEqualTo(eventCount); - assertThat(results.size()).isEqualTo(result); + assertThat(checker.checkResult(testData.get(0))).isTrue(); + assertThat(checker.checkResult(testData.get(1))).isTrue(); + assertThat(checker.checkResult(testData.get(2))).isFalse(); } @Test void name3() { - //given - //when - String last = Flux.just("Ben", "Michael", "Mark") - .last() - .block(); - System.out.println(last); - - //then + Flux.just(testData.toArray(new String[0])) + .doOnNext(s -> { + if (s.equals(testData.get(0))) { + throw new RuntimeException(); + } + }) + .doOnError(s -> checker.check(testData.get(2))) + .take(2) + .subscribe(); + assertThat(checker.checkResult(testData.get(2))).isTrue(); } - @Test void name4() { //given - - //when - List list = Flux.just("Ben", "Michael", "Mark").collectList().block(); - System.out.println(list); - //then - + String last = Flux.just(testData.toArray(new String[0])).last().block(); + assertThat(testData.get(2)).isEqualTo(last); } @Test - void name5() throws InterruptedException { + void name5() { //given -// EmitterProcessor data = EmitterProcessor.create(1); -// data.subscribe(t -> System.out.println(t)); -// FluxSink sink = data.sink(); -// sink.next(10L); -// sink.next(11L); -// sink.next(12L); -// data.subscribe(t -> System.out.println(t)); -// sink.next(13L); -// sink.next(14L); -// sink.next(15L); -// - EmitterProcessor emitter = EmitterProcessor.create(); - FluxSink sink = emitter.sink(); - emitter.publishOn(Schedulers.single()) - .map(String::toUpperCase) -// .filter(s -> s.startsWith("HELLO")) - .delayElements(Duration.of(1000, MILLIS)) - .subscribe(System.out::println); - - sink.next("Hello World!"); - sink.next("Goodbye World"); - sink.next("Again"); - Thread.sleep(3000); + List testData = Flux.just(this.testData.toArray(new String[0])).collectList().block(); + assertThat(this.testData).isEqualTo(testData); + } + + private static class ReactiveChecker { + private final Map testMap; + + public ReactiveChecker(List testData) { + this.testMap = testData.stream() + .collect(toMap(Function.identity(), s -> 0)); + } + + public void check(String s) { + if (testMap.containsKey(s)) { + int value = testMap.get(s); + testMap.put(s, value + 1); + return; + } + testMap.put(s, 0); + } + + public boolean allMatch() { + for (Integer value : testMap.values()) { + if (value != 1) { + return false; + } + } + return true; + } + + public boolean checkResult(String s) { + return testMap.get(s) == 1; + } } } From fbc72d846d81d282cc9e073aae98c533f5c050bb Mon Sep 17 00:00:00 2001 From: choiyusung Date: Mon, 1 Feb 2021 02:28:28 +0900 Subject: [PATCH 5/6] =?UTF-8?q?=EB=A6=AC=EC=95=A1=ED=8B=B0=EB=B8=8C=20?= =?UTF-8?q?=EB=A0=88=EB=94=94=EC=8A=A4=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lettuce/src/test/java/ReactiveRedisTest.java | 143 +++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 lettuce/src/test/java/ReactiveRedisTest.java diff --git a/lettuce/src/test/java/ReactiveRedisTest.java b/lettuce/src/test/java/ReactiveRedisTest.java new file mode 100644 index 0000000..8bcd744 --- /dev/null +++ b/lettuce/src/test/java/ReactiveRedisTest.java @@ -0,0 +1,143 @@ +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.reactive.RedisReactiveCommands; +import io.lettuce.core.api.sync.RedisCommands; +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.assertj.core.api.Assertions.assertThat; + +class ReactiveRedisTest { + + private final RedisClient client = RedisClient.create("redis://localhost"); + private final RedisReactiveCommands commands = client.connect().reactive(); + private final RedisCommands syncCommands = client.connect().sync(); + + + @AfterEach + void tearDown() { + syncCommands.flushall(); + } + + @Test + @DisplayName("리액티브는 응답처리를 별도의 스레드로 작업한") + void name() throws InterruptedException { + //given + final String key = "key"; + final String txValue = "value"; + final CountDownLatch latch = new CountDownLatch(1); + final Map resultMap = new HashMap<>(); + syncCommands.set(key, txValue); + + //when + commands.get(key).subscribe(value -> { + System.out.println(Thread.currentThread().getName()); + resultMap.put(key, value); + latch.countDown(); + }); + + //then + latch.await(); + assertThat(resultMap.get(key)).isEqualTo(txValue); + } + + @Test + void name2() throws InterruptedException { + //given + final List texts = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + texts.add(i + ""); + } + final List results = Lists.newArrayList(); + final CountDownLatch latch = new CountDownLatch(texts.size()); + texts.forEach(text -> syncCommands.set(text, text)); + + //when + Flux.just(texts.toArray(new String[0])). + flatMap(commands::get). + subscribe(value -> { + System.out.println(Thread.currentThread().getName()); + results.add(value); + latch.countDown(); + }); + latch.await(); + //then + + texts.forEach(expected -> assertThat(results).contains(expected)); + } + + @Test + @DisplayName("lists stream") + void name3() throws InterruptedException { + //given + final List texts = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + String text = Integer.toString(i); + texts.add(text); + syncCommands.set(text, text); + } + final CountDownLatch latch = new CountDownLatch(texts.size()); + + //when + Flux.just(texts.toArray(new String[0])) + .flatMap(commands::get) + .flatMap(value -> { + System.out.println(Thread.currentThread().getName()); + return commands.rpush("result", value); + }) + .subscribe(value -> latch.countDown()); + + //then + latch.await(); + List results = syncCommands.lrange("result", 0, texts.size()); + assertThat(texts.containsAll(results)).isTrue(); + } + + @Test + @DisplayName("값이 없으면 무시") + void name4() throws InterruptedException { + //given + final List texts = Lists.newArrayList("Ben", "Michael"); + final List results = Lists.newArrayList(); + final CountDownLatch latch = new CountDownLatch(texts.size()); + texts.forEach(value -> syncCommands.set(value, value)); + + //when + Flux.just("Ben", "Michael", "Mark") + .flatMap(commands::get) + .doOnNext(value -> { + System.out.println(Thread.currentThread().getName()); + results.add(value); + latch.countDown(); + }) + .subscribe(); + + //then + latch.await(); + assertThat(texts.containsAll(results)).isTrue(); + } + + @Test + @DisplayName("트랜잭션 적용") + void name5() throws InterruptedException { + //given + CountDownLatch latch = new CountDownLatch(1); + //when + commands.multi().subscribe(s -> { + commands.set("key", "1").doOnNext(System.out::println).subscribe(); + commands.incr("key").doOnNext(System.out::println).subscribe(); + commands.exec().subscribe(s1 -> latch.countDown()); + }); + + //then + latch.await(); + assertThat(syncCommands.get("key")).isEqualTo("2"); + } +} From f2d1c56961cbfc712473ad79bc9d038e0b2a1307 Mon Sep 17 00:00:00 2001 From: choiyusung Date: Mon, 8 Feb 2021 01:51:13 +0900 Subject: [PATCH 6/6] =?UTF-8?q?redis=20pub=20sub=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lettuce/src/test/java/PubSubTest.java | 77 ++++++++++++++++++++ lettuce/src/test/java/ReactiveRedisTest.java | 27 ++++++- 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 lettuce/src/test/java/PubSubTest.java diff --git a/lettuce/src/test/java/PubSubTest.java b/lettuce/src/test/java/PubSubTest.java new file mode 100644 index 0000000..db2c7cb --- /dev/null +++ b/lettuce/src/test/java/PubSubTest.java @@ -0,0 +1,77 @@ +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import static org.assertj.core.api.Assertions.assertThat; + +class PubSubTest { + private final RedisClient client = RedisClient.create("redis://localhost"); + private final StatefulRedisPubSubConnection connection = client.connectPubSub(); + + + @Test + void name() throws InterruptedException { + //given + Set messageSets = new HashSet<>(); + CountDownLatch latch = new CountDownLatch(2); + connection.addListener(new RedisPubSubAdapter() { + public void message(String channel, String message) { + System.out.println(Thread.currentThread().getName()); + messageSets.add(message); + latch.countDown(); + } + }); + + RedisPubSubCommands sync = connection.sync(); + sync.subscribe("channel"); + + //when + StatefulRedisConnection sender = client.connect(); + + sender.sync().publish("channel", "Message-1"); + sender.sync().publish("channel", "Message-2"); + + latch.await(); + + //then + assertThat(messageSets.size()).isEqualTo(2); + assertThat(messageSets).contains("Message-1", "Message-2"); + } + + @Test + void name2() throws InterruptedException { + //given + Set messageSets = new HashSet<>(); + CountDownLatch latch = new CountDownLatch(2); + StatefulRedisPubSubConnection connection = client.connectPubSub(); + + RedisPubSubReactiveCommands reactive = connection.reactive(); + reactive.subscribe("channel").subscribe(); + + reactive.observeChannels() + .doOnNext(patternMessage -> { + System.out.println(Thread.currentThread().getName()); + messageSets.add(patternMessage.getMessage()); + latch.countDown(); + }) + .subscribe(); + //when + StatefulRedisConnection sender = client.connect(); + + sender.sync().publish("channel", "Message-1"); + sender.sync().publish("channel", "Message-2"); + latch.await(); + + //then + assertThat(messageSets.size()).isEqualTo(2); + assertThat(messageSets).contains("Message-1", "Message-2"); + } +} diff --git a/lettuce/src/test/java/ReactiveRedisTest.java b/lettuce/src/test/java/ReactiveRedisTest.java index 8bcd744..bffd493 100644 --- a/lettuce/src/test/java/ReactiveRedisTest.java +++ b/lettuce/src/test/java/ReactiveRedisTest.java @@ -23,7 +23,7 @@ class ReactiveRedisTest { @AfterEach void tearDown() { - syncCommands.flushall(); +// syncCommands.flushall(); } @Test @@ -128,6 +128,7 @@ void name4() throws InterruptedException { @DisplayName("트랜잭션 적용") void name5() throws InterruptedException { //given + syncCommands.flushall(); CountDownLatch latch = new CountDownLatch(1); //when commands.multi().subscribe(s -> { @@ -140,4 +141,28 @@ void name5() throws InterruptedException { latch.await(); assertThat(syncCommands.get("key")).isEqualTo("2"); } + + @Test + @DisplayName("스레드 변경 타이밍 확인") + void name6() { + //given + final List texts = Lists.newArrayList("Ben", "Michael", "Mark"); + texts.forEach(value -> syncCommands.set(value, value)); + + //when + Flux.just("Ben", "Michael", "Mark") + .filter(s -> { + System.out.println(Thread.currentThread().getName()); + return s.startsWith("M"); + + }) + .flatMap(commands::get) + .flatMap(s -> { + System.out.println(Thread.currentThread().getName()); + return Flux.just(s); + }) + .subscribe(value -> System.out.println("Got value: " + value)); + //then + + } }