What is a Splitter Aggregator Pattern ?
A Message is divided into small independent pieces and processed individually and then grouped all pieces and processed together.
In this example, We will create an order with two different types of items and will see how these items are processed individually to get price of each item and calculating the total price of all items of this order using aggregation strategy. We will see how the following business components involved in developing this application.
1. Item Processor component 2. Order Items Aggregator 3. Java DSL Router 4. Camel context Configuration 5. Data models 6. Test the Application
Step 1 : Item Processor component
ItemSvc.java
package com.kswaughs.split; import com.kswaughs.split.beans.Item; public class ItemSvc { public Item processBook(Item item) throws InterruptedException { System.out.println("handle book Item:" +item); item.setPrice(30); System.out.println("book Item processed"); return item; } public Item processPhone(Item item) throws InterruptedException { System.out.println("handle phone Item:" +item); item.setPrice(500); System.out.println("phone Item processed"); return item; } }
Step 2 : Order Items Aggregator
OrderItemStrategy.java
package com.kswaughs.split; import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; import com.kswaughs.split.beans.Item; import com.kswaughs.split.beans.Order; public class OrderItemStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { Item newItem= newExchange.getIn().getBody(Item.class); System.out.println("Aggregate first item: " + newItem); Order currentOrder = new Order(); currentOrder.setId("ORD"+System.currentTimeMillis()); List<Item> currentItems = new ArrayList<Item>(); currentItems.add(newItem); currentOrder.setItems(currentItems); currentOrder.setTotalPrice(newItem.getPrice()); newExchange.getIn().setBody(currentOrder); // the first time we aggregate we only have the new exchange, // so we just return it return newExchange; } Order order = oldExchange.getIn().getBody(Order.class); Item newItem= newExchange.getIn().getBody(Item.class); System.out.println("Aggregate old items: " + order); System.out.println("Aggregate new item: " + newItem); order.getItems().add(newItem); double totalPrice = order.getTotalPrice() + newItem.getPrice(); order.setTotalPrice(totalPrice); // return old as this is the one that has all the orders gathered until now return oldExchange; } }
Step 3 : Java DSL Router
OrderRouter.java
package com.kswaughs.split; import org.apache.camel.builder.RouteBuilder; public class OrderRouter extends RouteBuilder { @Override public void configure() throws Exception { from("direct:processOrder") .split(body().method("getItems"), new OrderItemStrategy()) // each splitted message is send to this bean to process it .to("direct:processItem") .end(); from("direct:processItem") .choice() .when(body().method("getType").isEqualTo("Book")) .to("bean:itemService?method=processBook"). otherwise() .to("bean:itemService?method=processPhone"); } }
Step 4 : Camel context Configuration
camel-context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://camel.apache.org/schema/cxf" xmlns:jaxrs="http://cxf.apache.org/jaxrs" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="orderRouter" class="com.kswaughs.split.OrderRouter" /> <bean id="itemService" class="com.kswaughs.split.ItemSvc" /> <camelContext id="orderCtx" xmlns="http://camel.apache.org/schema/spring"> <routeBuilder ref="orderRouter" /> </camelContext> </beans>
Step 5 : Data models
Data model of Item
Item.java
package com.kswaughs.split.beans; public class Item { public Item(String id, String name, String type) { this.id = id; this.name = name; this.type = type; } private String id; private String name; private String type; private double price; public String getId() { return id; } public String getName() { return name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } public String getType() { return type; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("Item [id="); builder.append(id); builder.append(", name="); builder.append(name); builder.append(", type="); builder.append(type); builder.append(", price="); builder.append(price); builder.append("]"); return builder.toString(); } }
Data model of Order
Order.java
package com.kswaughs.split.beans; import java.util.List; public class Order { private String id; private List- items; private double totalPrice; public String getId() { return id; } public void setId(String id) { this.id = id; } public List
- getItems() { return items; } public void setItems(List
- items) { this.items = items; } public double getTotalPrice() { return totalPrice; } public void setTotalPrice(double totalPrice) { this.totalPrice = totalPrice; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("Order [id="); builder.append(id); builder.append(", items="); builder.append(items); builder.append(", totalPrice="); builder.append(totalPrice); builder.append("]"); return builder.toString(); } }
Step 6 : Start the camel context and test your application
OrderApp.java
package com.kswaughs.split; import java.util.ArrayList; import java.util.List; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.kswaughs.split.beans.Item; import com.kswaughs.split.beans.Order; public class OrderApp { public static void main(String[] args) { try { ApplicationContext springCtx = new ClassPathXmlApplicationContext( "camel-context.xml"); CamelContext context = springCtx.getBean("orderCtx", CamelContext.class); context.start(); ProducerTemplate producerTemplate = context.createProducerTemplate(); List<Item> items = new ArrayList<Item>(); items.add(new Item("1", "Camel in Action book", "Book")); items.add(new Item("2", "Apple IPhone8", "Phone")); Order myOrder = new Order(); myOrder.setItems(items); Order respOrder = producerTemplate.requestBody( "direct:processOrder", myOrder, Order.class); System.out.println("resp order:"+respOrder); context.stop(); } catch (Exception e) { e.printStackTrace(); } } }
Console Output
handle book Item:Item [id=1, name=Camel in Action book, type=Book, price=0.0] book Item processed Aggregate first item: Item [id=1, name=Camel in Action book, type=Book, price=30.0] handle phone Item:Item [id=2, name=Apple IPhone8, type=Phone, price=0.0] phone Item processed Aggregate old items: Order [id=ORD1463125872691, items=[Item [id=1, name=Camel in Action book, type=Book, price=30.0]], totalPrice=30.0] Aggregate new item: Item [id=2, name=Apple IPhone8, type=Phone, price=500.0] resp order:Order [id=ORD1463125872691, items=[Item [id=1, name=Camel in Action book, type=Book, price=30.0], Item [id=2, name=Apple IPhone8, type=Phone, price=500.0]], totalPrice=530.0]
Maven dependencies
pom.xml
<properties> <camelspring.version>2.16.0</camelspring.version> <spring.version>4.1.6.RELEASE</spring.version> </properties> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>${camelspring.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cxf</artifactId> <version>${camelspring.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>${camelspring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency>
No comments:
Post a Comment