Eclipse Snippet useful
name:boot
@SpringBootApplication
public class StarterApplication {
public static void main(String[] args) {
SpringApplication.run(StarterApplication.class, args);
}
name: restsetup
/**
1. restcon or restconwithclass // create basic one
2. getmap // for get mapping
3. postmap // for postmapping
4. putmapid // put by id
5. deletemapid //delete by id
*/
name:restcon
@CrossOrigin(origins = "http://localhost:8081")
@RestController
@RequestMapping("/api")
name:restconwithclass
@CrossOrigin(origins = "http://localhost:8081")
@RestController
@RequestMapping("/api")
public class RestCon {
name: getmap
@GetMapping("/tutorials")
public ResponseEntity<String> getAllTutorials(@RequestParam(required = false)
String title) {
return new ResponseEntity<>(title, HttpStatus.OK);
}
name: getmapid
@GetMapping("/tutorials/{id}")
public ResponseEntity<String> getTutorialById(@PathVariable("id") String id) {
return new ResponseEntity<>(id, HttpStatus.OK);
}
name: postmap
@PostMapping("/test")
public ResponseEntity<String> createTest(@RequestBody String tutorial) {
return new ResponseEntity<>(tutorial, HttpStatus.CREATED);
}
name: putmapid
@PutMapping("/tutorials/{id}")
public ResponseEntity<String> updateTutorial(@PathVariable("id") long id,
@RequestBody String tutorial) {
return new ResponseEntity<>(tutorial, HttpStatus.OK);
}
name: deletemapid
@DeleteMapping("/tutorials/{id}")
public ResponseEntity<HttpStatus> deleteTutorial(@PathVariable("id") long id) {
return new ResponseEntity<>(HttpStatus.NO_CONTENT);;
name: deletemap
@DeleteMapping("/test")
public ResponseEntity<HttpStatus> deleteAllTutorials() {
return new ResponseEntity<>(HttpStatus.NO_CONTENT);;
}
name: uploadfile
/**
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
max file size and request
**/
//https://howtodoinjava.com/spring-boot/spring-boot-file-upload-rest-api/
//https://www.javaguides.net/2018/11/spring-boot-2-file-upload-and-download-
rest-api-tutorial.html
private Path fileStorageLocation;
@PostMapping("/uploadfile")
public Map<String, Object> uploadFile(@RequestParam("file") MultipartFile
file)
throws Exception {
Map<String, Object> map = new HashMap<>();
this.fileStorageLocation =
Paths.get("./uploadir").toAbsolutePath().normalize();
String fileName = StringUtils.cleanPath(file.getOriginalFilename());
try {
// Check if the file's name contains invalid characters
if (fileName.contains("..")) {
throw new Exception("Sorry! Filename contains invalid path
sequence " + fileName);
}
// Copy file to the target location (Replacing existing file with
the same name)
Path targetLocation = this.fileStorageLocation.resolve(fileName);
Files.copy(file.getInputStream(), targetLocation,
StandardCopyOption.REPLACE_EXISTING);
// Populate the map with file details
map.put("fileName", file.getOriginalFilename());
map.put("fileSize", file.getSize());
map.put("fileContentType", file.getContentType());
// File upload is successful
map.put("message", "File upload done");
return map;
} catch (IOException ex) {
throw new Exception("Could not store file " + fileName + ".
Please try again!", ex);
}
name: uploadfiles
//https://howtodoinjava.com/spring-boot/spring-boot-file-upload-rest-api/
@PostMapping("/uploadfiles")
public List<Map<String, Object>> uploadMultipleFiles(
@RequestParam("files") MultipartFile[] files) throws IOException
{
return (List<Map<String, Object>>) Arrays.asList(files).stream()
.map(file -> {
try {
return uploadFile(file);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
})
.collect(Collectors.toList());
}
name: uploadfilesasync
//https://dev.to/mspilari/asynchronous-file-upload-in-java-with-spring-4dnd
//https://howtodoinjava.com/spring-boot/spring-boot-file-upload-rest-api/
/*
create this configuration
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public Executor taskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncFileUploader - ");
executor.initialize();
return executor;
}
}
@Service
class FileStorageManager {
@SneakyThrows
@Async
public void save(MultipartFile file) {
Thread.sleep(new Random().nextLong(4000, 8000));
System.out.println(file.getOriginalFilename() + " is uploaded at " +
LocalDateTime.now());
}
}
*/
@Autowired
FileStorageManager fileStorageManager;
@Async
@PostMapping("/uploadasync")
public CompletableFuture<ResponseEntity<String>> handleConcurrentFilesUpload(
@RequestParam("files") MultipartFile[] files) throws IOException
{
// Handle empty file error
if (files.length == 0) {
return
CompletableFuture.completedFuture(ResponseEntity.badRequest().body("No files
submitted"));
}
// File upload process is submitted
else {
for (MultipartFile file : files) {
fileStorageManager.save(file);
//TODO: access and store each file into file storage
}
return CompletableFuture.completedFuture(ResponseEntity.ok("File
upload started"));
}
}
name: uploadfileexception
@RestControllerAdvice
public class CustomExceptionHandler {
@ExceptionHandler(MaxUploadSizeExceededException.class)
public ResponseEntity<String> handleMaxUploadSizeExceeded() {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("File size exceeds
the limit.");
}
@ExceptionHandler(MultipartException.class)
public ResponseEntity<String> handleMultipartException() {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Error occurred during file upload.");
}
}
===================================== JPA snippet
==============================================================================
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
https://medium.com/@aishuguna5/spring-boot-series-entity-manager-d228ad666da3
https://www.bezkoder.com/spring-boot-jpa-h2-example/
https://gist.ly/youtube-summarizer/mastering-spring-data-jpa-complete-guide-to-
relationships-and-queries
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
name: jpasetup
/**
h2 db
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
1. jpasimple // create jpa simple
2. jpareposimple // create jpa repo simple
3. jpaservicesimple // create service with jpa
4. jpaserviceimplsimple // create impl service with jpa
5. jpasimplerest // create jpa simple rest related with jpa
**/
name:jpasimple
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Author {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer id;
private String firstName;
private String lastName;
private String email;
private Integer age;
}
name:jpareposimple
public interface AuthorRepository extends JpaRepository<Author, Integer> {
name:jpaservicesimple
public interface AuthorService {
Author saveAuthor(Author author);
List<Author> fetchAuthorList();
Author updateAuthor(Author author, Integer authorId);
void deleteAuthorById(Integer departmentId);
}
name:jpaserviceimplsimple
@Service
public class AuthorServiceImpl implements AuthorService {
@Autowired
private AuthorRepository authorRepository;
@Override
public Author saveAuthor(Author author) {
// Saves and returns the department entity.
return authorRepository.save(author);
}
@Override
public List<Author> fetchAuthorList() {
// Retrieves and returns a list of all department entities.
return (List<Author>) authorRepository.findAll();
}
@Override
public Author updateAuthor(Author author, Integer authorId) {
return null;
}
@Override
public void deleteAuthorById(Integer authorId) {
// Deletes the department entity by its ID.
authorRepository.deleteById(authorId);
}
}
name:jpasimplerest
@CrossOrigin(origins = "http://localhost:8081")
@RestController
@RequestMapping("/api")
public class AuthorRestController {
@Autowired
private AuthorService authorService;
@PostMapping("/authors")
public Author saveAuthors(@Valid @RequestBody Author author) {
return authorService.saveAuthor(author);
}
@GetMapping("/authors")
public List<Author> fetchAuthorList() {
return authorService.fetchAuthorList();
}
@PutMapping("/authors/{id}")
public Author updateDepartment(@RequestBody Author author, @PathVariable("id")
Integer authorId) {
return authorService.updateAuthor(author, authorId);
}
@DeleteMapping("/authors/{id}")
public String deleteDepartmentById(@PathVariable("id") Integer authorId) {
authorService.deleteAuthorById(authorId);
return "Deleted Successfully";
}
}
name:jpaonetomanyuni
/**
https://medium.com/@bectorhimanshu/spring-data-jpa-one-to-many-bidirectional-
relationship-mapping-1dd7088bec3a
https://medium.com/@bectorhimanshu/spring-data-jpa-one-to-many-unidirectional-
relationship-mapping-2a18ed985a5d
https://codescoddler.medium.com/why-you-should-avoid-the-unidirectional-onetomany-
association-in-jpa-c00ae83aaeba
https://github.com/Java-Techie-jt/spring-data-jpa-one2many-join-example/blob/
master/src/main/java/com/javatechie/jpa/repository/CustomerRepository.java
https://thorben-janssen.com/best-practices-many-one-one-many-associations-mappings/
unidirectional : memiliki 3 table
advice : jangan di pake yang di pake bidirectional
author()
author_book() table_junction
book()
statement : one author menulis many buku
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Author {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer authorId;
private String name;
private String lastName;
private String email;
private Integer age;
--- pilih salah satu saja --------------------------
@OneToMany(cascade = CascadeType.ALL)
private List<Book> books = new ArrayList<Book>();
@OneToMany(cascade = CascadeType.ALL)
@JoinColumn(name = "author_id")
private List<Book> books = new ArrayList<Book>();
@OneToMany(cascade = CascadeType.ALL)
@JoinColumn(name = "author_id",referencedColumnName = "authorId")
private List<Book> books = new ArrayList<Book>();
-----------------------------------------------------
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Book {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer bookId;
private String title;
private String bookcode;
}
// service author repository
public interface AuthorRepository extends JpaRepository<Author,Integer> { };
// service book repository
public interface BookRepository extends JpaRepository<Book,Integer> { };
// Create authors
Author author1 = new Author();
author1.setName("Deepak Kumar");
Author author2 = new Author();
author2.setName("Katty Janes");
// Create books
Book book1 = new Book();
book1.setTitle("Welcome to CSS");
Book book2 = new Book();
book2.setTitle("Javascript Programming");
// Associate books with authors
author1.getBooks().add(book1);
author2.getBooks().add(book2);
/ Save authors (and cascade to save associated books)
authorRepository.save(author1);
authorRepository.save(author2);
//rest
dto
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class AuthorRequest {
private Author author;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class AuthorResponse {
private String name;
private String bookName;
public OrderResponse(String name, String bookName) {
this.name = name;
this.bookName = bookName;
}
rest --
@Autowired
private AuthorRepository authorRepository;
@Autowired
private BookRepository bookRepository;
@PostMapping("/placeOrder")
public Author placeOrder(@RequestBody AuthorRequest request){
return authorRepository.save(request.getAuthor());
}
@GetMapping("/findAllAuthors")
public List<Author> findAllAuthors(){
return authorRepository.findAll();
}
**/
name:jpamanytooneuni
/***
https://medium.com/@bectorhimanshu/spring-data-jpa-one-to-many-bidirectional-
relationship-mapping-1dd7088bec3a
https://dev.to/sovannaro/spring-boot-jpa-many-to-one-mapping-5g80
https://vladmihalcea.com/manytoone-jpa-hibernate/
https://thorben-janssen.com/hibernate-tips-map-bidirectional-many-one-association/
bidirectional : memiliki 3 table by default
advice : Untuk Many to one uni jangan di pake kalo bisa bagus kalo jika hanya dari
posisi direksinya dari many
statement : Many buku di tulis oleh one Author
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Book {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer bookId;
private String title;
private String bookcode;
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Author {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer authorId;
private String name;
private String lastName;
private String email;
private Integer age;
**/
name:jpamanyonmany (Not Yet)
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Course {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer id;
private String name;
private String description;
@ManyToMany(mappedBy = "courses")
private List<Author> authors = new ArrayList<>();
}
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class Author {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer id;
private String firstName;
private String lastName;
private String email;
private Integer age;
@ManyToMany
@JoinTable(
name = "author_course",
joinColumns = @JoinColumn(name = "author_id"),
inverseJoinColumns = @JoinColumn(name = "course_id")
)
private List<Course> courses = new ArrayList<>();
}
name:repoentity (Not Yet)
@Repository
public class TutorialRepository {
@Autowired
private EntityManager entityManager;
}
name: repoentitylist (Not Yet)
@Repository
public class TutorialRepository {
@PersistenceContext
private EntityManager entityManager;
public List<Tutorial> findByTitleContaining(String title) {
TypedQuery<Tutorial> query = entityManager.createQuery(
"SELECT t FROM Tutorial t WHERE LOWER(t.title) LIKE
LOWER(CONCAT('%', :title,'%'))",
Tutorial.class);
return query.setParameter("title", title).getResultList();
}
public List<Tutorial> findByPublished(boolean isPublished) {
TypedQuery<Tutorial> query = entityManager.createQuery(
"SELECT t FROM Tutorial t WHERE t.published=:isPublished",
Tutorial.class);
return query.setParameter("isPublished", isPublished).getResultList();
}
public List<Tutorial> findByTitleAndPublished(String title, boolean isPublished)
{
TypedQuery<Tutorial> query = entityManager.createQuery(
"SELECT t FROM Tutorial t WHERE LOWER(t.title) LIKE
LOWER(CONCAT('%', :title,'%')) AND t.published=:isPublished",
Tutorial.class);
return query.setParameter("title", title).setParameter("isPublished",
isPublished).getResultList();
}
name:repoentitycrud (Not Yet)
public class TutorialRepository {
@PersistenceContext
private EntityManager entityManager;
@Transactional
public Tutorial save(Tutorial tutorial) {
entityManager.persist(tutorial);
return tutorial;
}
public Tutorial findById(long id) {
Tutorial tutorial = (Tutorial) entityManager.find(Tutorial.class, id);
return tutorial;
}
@Transactional
public Tutorial update(Tutorial tutorial) {
entityManager.merge(tutorial);
return tutorial;
}
@Transactional
public Tutorial deleteById(long id) {
Tutorial tutorial = findById(id);
if (tutorial != null) {
entityManager.remove(tutorial);
}
return tutorial;
}
@Transactional
public int deleteAll() {
Query query = entityManager.createQuery("DELETE FROM Tutorial");
return query.executeUpdate();
}
}
================================================= Kafka
======================================================
name: kafkaproducersetup
/**
1. kafkaproducerproperties
2. kafkaproducerjsonserializer1
3. kafkaproducerjsonserializer2
4. kafkaproducerconfig
5. kafkaproducerservices
6. kafkaproducerrest
Docker
wsl --list wsl -d Ubuntu-24.04
cd /mnt/c/appasdocker/kafka2
docker-compose up -d
**/
name: kafkaproducerproperties
@Configuration
@ConfigurationProperties(prefix = "kafka.producer")
@Getter
@Setter
public class KafkaProducerProperties {
private String bootstrapServers;
private String topicName;
}
/**
in kafka properties
kafka.producer.bootstrapServers=localhost:9092
kafka.producer.topicName=orders
**/
name: kafkaproducerjsonserializer1
public class KafkaJsonSerializerConfig extends AbstractConfig
{
public static final String JSON_INDENT_OUTPUT = "json.indent.output";
public static final boolean JSON_INDENT_OUTPUT_DEFAULT = false;
public static final String JSON_INDENT_OUTPUT_DOC = "Whether JSON output should
be indented (\"pretty-printed\")";
private static ConfigDef config;
static
{
config = new ConfigDef().define(JSON_INDENT_OUTPUT, ConfigDef.Type.BOOLEAN,
JSON_INDENT_OUTPUT_DEFAULT, ConfigDef.Importance.LOW, JSON_INDENT_OUTPUT_DOC);
}
public KafkaJsonSerializerConfig(Map<?, ?> props)
{
super(config, props);
}
name: kafkaproducerjsonserializer2
/**
com.kafka.producer.api.config.kafka.serializer.KafkaJsonGenericSerializer
*/
blic class KafkaJsonGenericSerializer<T> implements Serializer<T>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaJsonGenericSerializer.class.getName());
private ObjectMapper objectMapper;
public KafkaJsonGenericSerializer() {}
@Override
public void configure(Map<String, ?> config, boolean isKey)
{
configure(new KafkaJsonSerializerConfig(config));
}
protected void configure(KafkaJsonSerializerConfig config)
{
boolean prettyPrint =
config.getBoolean(KafkaJsonSerializerConfig.JSON_INDENT_OUTPUT);
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT,
prettyPrint);
}
@Override
public byte[] serialize(String topic, T data) {
try {
if (data == null){
LOGGER.error("Null received at serializing");
return null;
}
LOGGER.info("Serializing orderItem ...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing OrderItem to
byte[]");
}
}
@Override
public void close() {
Serializer.super.close();
}
name: kafkaproducerconfig
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaProducerConfig.class);
@Autowired
private KafkaProducerProperties kafkaProducerProperties;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProducerProperties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"com.kafka.producer.api.config.kafka.serializer.KafkaJsonGenericSerializer");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
KafkaTemplate<String, Object> kafkaProducerTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new
KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord,
RecordMetadata recordMetadata) {
LOGGER.info("ACK from ProducerListener message: {} offset: {}",
producerRecord.value().toString(),
recordMetadata.offset());
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord,
RecordMetadata recordMetadata, Exception exception) {
ProducerListener.super.onError(producerRecord, recordMetadata,
exception);
LOGGER.error("Error from ProducerListener message: {} offset: {}",
producerRecord.value(),
recordMetadata.offset());
LOGGER.error("Error from ProducerListener exception :
{}",exception.getMessage());
}
});
return kafkaTemplate;
}
name: kafkaproducerservices
// https://github.com/tufangorel/spring-boot-3-apache-kafka-producer-consumer/
blob/main/kafka-producer-rest-api/src/main/java/com/kafka/producer/api/service/
OrderItemService.java
@Service
public class TestProducerService {
private KafkaTemplate<String, Object> kafkaProducerTemplate;
private KafkaProducerProperties kafkaProducerProperties;
private static final Logger LOGGER =
LoggerFactory.getLogger(TestProducerService.class.getName());
public TestProducerService(KafkaTemplate<String, Object>
kafkaProducerTemplate,KafkaProducerProperties kafkaProducerProperties){
this.kafkaProducerTemplate = kafkaProducerTemplate;
this.kafkaProducerProperties = kafkaProducerProperties;
}
public void sendToTopic(String message){ // you can change to any
objectMapper
String topic = kafkaProducerProperties.getTopicName();
UUID UUIDVal = UUID.randomUUID();
String orderId = UUIDVal.toString();
CompletableFuture<SendResult<String, Object>> future =
kafkaProducerTemplate.send(topic, orderId, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.info("The record with key : {}, value : {} is
produced successfully to offset {}",
result.getProducerRecord().key(),
result.getProducerRecord().value(),
result.getRecordMetadata().offset());
}
else {
LOGGER.error("The record with key: {}, value: {} cannot be
processed! caused by {}",
result.getProducerRecord().key(),
result.getProducerRecord().value(),
ex.getMessage());
}
});
}
}
name: kafkaproducerrest
@CrossOrigin(origins = "http://localhost:8081")
@RestController
@RequestMapping("/api")
public class AuthorRestController {
@Autowired
private TestProducerService testProducerService;
@PostMapping("/publish")
public ResponseEntity<String> publish(@Valid @RequestBody String message) {
testProducerService.testProducerService(message)
return new ResponseEntity<String>( author, HttpStatus.OK );
}
}
--------------------------------- kafkaconsumer
-----------------------------------------------------------
name: kafkaconsumersetup
/***
1. kafkaconsumerdeserializer
2. kafkaconsumerconfig
3. kafkaconsumerlistener
**/
name: kafkaconsumerdeserializer
public class KafkaJsonGenericDeserializer<T> implements Deserializer<T>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaJsonGenericDeserializer.class.getName());
private ObjectMapper objectMapper;
private Class<T> type;
public KafkaJsonGenericDeserializer() {}
@Override
public void configure(Map<String, ?> props, boolean isKey)
{
configure(new KafkaJsonDeserializerConfig(props), isKey);
}
protected void configure(KafkaJsonDecoderConfig config, Class<T> type)
{
this.objectMapper = new ObjectMapper();
this.type = type;
boolean failUnknownProperties =
config.getBoolean(KafkaJsonDeserializerConfig.FAIL_UNKNOWN_PROPERTIES);
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
failUnknownProperties);
}
@SuppressWarnings("unchecked")
private void configure(KafkaJsonDeserializerConfig config, boolean isKey)
{
if (isKey)
{
configure(config, (Class<T>)
config.getClass(KafkaJsonDeserializerConfig.JSON_KEY_TYPE));
} else
{
configure(config, (Class<T>)
config.getClass(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE));
}
}
@Override
public T deserialize(String ignored, byte[] bytes)
{
if (bytes == null || bytes.length == 0)
{
LOGGER.error("Null received at deserializing");
return null;
}
try
{
LOGGER.info("Deserializing...");
return objectMapper.readValue(bytes, type);
} catch (Exception e)
{
throw new SerializationException("Error when deserializing byte[] to
OrderItem");
}
}
protected Class<T> getType()
{
return type;
}
@Override
public void close() {
Deserializer.super.close();
}
}
public class KafkaJsonDeserializerConfig extends KafkaJsonDecoderConfig {
public static final String JSON_KEY_TYPE = "json.key.type";
public static final String JSON_KEY_TYPE_DEFAULT = Object.class.getName();
public static final String JSON_KEY_TYPE_DOC =
"Classname of the type that the message key should be deserialized to";
public static final String JSON_VALUE_TYPE = "json.value.type";
public static final String JSON_VALUE_TYPE_DEFAULT = Object.class.getName();
public static final String JSON_VALUE_TYPE_DOC =
"Classname of the type that the message value should be deserialized
to";
private static ConfigDef config;
static {
config = baseConfig()
.define(JSON_KEY_TYPE, ConfigDef.Type.CLASS, JSON_KEY_TYPE_DEFAULT,
ConfigDef.Importance.MEDIUM, JSON_KEY_TYPE_DOC)
.define(JSON_VALUE_TYPE, ConfigDef.Type.CLASS,
JSON_VALUE_TYPE_DEFAULT,
ConfigDef.Importance.MEDIUM, JSON_VALUE_TYPE_DOC);
}
public KafkaJsonDeserializerConfig(Map<?, ?> props) {
super(config, props);
}
public class KafkaJsonDecoderConfig extends AbstractConfig
{
public static final String FAIL_UNKNOWN_PROPERTIES =
"json.fail.unknown.properties";
public static final boolean FAIL_UNKNOWN_PROPERTIES_DEFAULT = true;
public static final String FAIL_UNKNOWN_PROPERTIES_DOC = "Whether to fail
deserialization if unknown JSON properties are encountered";
public KafkaJsonDecoderConfig(Map<?, ?> props)
{
super(baseConfig(), props);
}
protected KafkaJsonDecoderConfig(ConfigDef config, Map<?, ?> props)
{
super(config, props);
}
protected static ConfigDef baseConfig()
{
return new ConfigDef().define(FAIL_UNKNOWN_PROPERTIES,
ConfigDef.Type.BOOLEAN, FAIL_UNKNOWN_PROPERTIES_DEFAULT, ConfigDef.Importance.LOW,
FAIL_UNKNOWN_PROPERTIES_DOC);
}
}
name : kafkaconsumerconfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaConsumerConfig.class.getName());
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
// Creating a map of string-object type
Map<String, Object> config = new HashMap<>();
// Adding the Configuration
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"com.kafka.consumer.api.config.kafka.deserializer.KafkaJsonGenericDeserializer");
// Returning message in JSON format
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new KafkaJsonGenericDeserializer<>());
}
// Creating a Listener
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListener()
{
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
name:kafkaconsumerlistener
@Component
public class KafkaListenerTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaListenerTest.class.getName());
@KafkaListener(topics = "orders", groupId = "group_id", containerFactory =
"kafkaListener")
public void consumeOrderItem(ConsumerRecord consumerRecord) {
LOGGER.info("topic = %s, partition = %d, offset = %d, customer = %s,
country = %s\n",
consumerRecord.topic(), consumerRecord.partition(),
consumerRecord.offset(), consumerRecord.key(),
consumerRecord.value());
LOGGER.info("OrderItem received from Kafka broker = " +
consumerRecord.value());
try {
String value = (String) consumerRecord.value();
LOGGER.info("OrderItem received from Kafka broker as Java Object
=" + value);
} catch (Exception e) {
LOGGER.info(e.getMessage());
}
--------------------------- kafkaconsumerpool
--------------------------------------------------------
https://www.baeldung.com/java-kafka-consumer-api-read
https://www.geeksforgeeks.org/apache-kafka-create-consumer-using-java/
https://github.com/puneetchhabra22/scalable-notification-system/blob/main/
PushNConsumer/src/main/java/com/puneetchhabra/PushNConsumer/consumer/
PriorityAwarePartitionConsumer.java
https://medium.com/@shandilya.prashant/priority-based-rate-limiting-with-kafka-
spring-boot-c2c34ef99cc2