31.5.13

ZooKeeper Performance (Simple test)

I tried to create 10,000 EPHEMERAL nodes, and it took 11 seconds, so it is about 1000 nodes per second. It took 24 milliseconds to list all these 10,000 children. Time to list the children are acceptable, but creating children takes more than I would have expected.

If I use a slower system, a Linux system with two core of
Intel(R) Core(TM)2 CPU         T5600  @ 1.83GHz

It took me 123 seconds to finish creating the nodes.

The create operation will be much faster with asynchronous implementation. It took 1,464 milliseconds to finish creating 10K znodes locally, and 3,384 or even 1,868 milliseconds to delete those nodes. 

Official ZooKeeper Performance Report
Official ZooKeeper Performance Report Measuring Under Various Loads

How to Run ZooInspector

Simply running zooInspector.cmd doesn't work. I did the following things and it works for me:

1. Create a pom.xml file like this

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <groupid>nonam</groupid>
    <artifactid>noname</artifactid>
    <version>0.0.1</version>

    <dependencies>
        <dependency>
            <groupid>org.apache.zookeeper</groupid>
            <artifactid>zookeeper</artifactid>
            <version>3.4.5</version>
            <exclusions>
                <exclusion>
                    <groupid>com.sun.jmx</groupid>
                    <artifactid>jmxri</artifactid>
                </exclusion>
                <exclusion>
                    <groupid>com.sun.jdmk</groupid>
                    <artifactid>jmxtools</artifactid>
                </exclusion>
                <exclusion>
                    <groupid>javax.jms</groupid>
                    <artifactid>jms</artifactid>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
</project>

2. run mvn dependency:copy-dependencies
3. run the following command:

java -cp zookeeper-3.4.5-ZooInspector.jar;lib/*;target/dependency/* org.apache.zookeeper.inspector.ZooInspector

Put it into the cmd file if you like.

My STS Slow Down a lot Overtime

I don't know why. When the system restarts, it looks fine. But after running for several days, it turns out to be very slooooooooooow. It takes minutes for me to just open a pom.xml file. Restarting STS itself seems helpful but still need to wait for about 2-3 seconds.

== Update ==
Disabled many plugins. Looks better now.

Very simple log4j.properties

http://www.beaconhill.com/blog/?p=38

There are some minor error in the original post. Here is a version I think would be better.

# Set root logger to DEBUG and add an appender called A1.
log4j.rootLogger=INFO, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

30.5.13

Setting up a Storm Cluster

Setting up a Storm cluster · nathanmarz/storm Wiki · GitHub


  1. Set up a Zookeeper cluster
  2. Install dependencies on Nimbus and worker machines
  3. Download and extract a Storm release to Nimbus and worker machines
  4. Fill in mandatory configurations into storm.yaml
  5. Launch daemons under supervision using "storm" script and a supervisor of your choice

Set up a Single Node Zookeeper Cluster

  • Storm uses Zookeeper for coordinating the cluster. Zookeeper is not used for message passing, so the load Storm places on Zookeeper is quite low. Single node Zookeeper clusters should be sufficient for most cases.
I agreed with this statement. At least for the first time user.




Scala

I think it is time to learn some new language(s).

Scala-IDE

A Wise Man Said

You can learn a lot from experiments that don't work out or don't work out like you expect.   Don't get down if something fails, often times that failure will lead to a breakthrough next time around.

-- By Another Jeff, And Thank you!!

Kafka


Examples:

29.5.13

Using Solaris Studio with Karaf

SOL_STUDIO_HOME=/opt/SolarisStudio12.3-linux-x86-bin/solarisstudio12.3/
export LD_LIBRARY_PATH=${SOL_STUDIO_HOME}/lib/amd64:${JRE_HOME}/lib/amd64/$LD_LIBRARY_PATH
COLLECT=${SOL_STUDIO_HOME}/bin/amd64/collect

$COLLECT -d "$KARAF_HOME/../profiler" -j on "$JAVA"  ....

Aggregator


Aggregator in EIP

  • Splitter is useful to break out a single message into a sequence of sub-messages that can be processed individually. Likewise, a Recipient List or a Publish-Subscribe Channel is useful to forward a request message to multiple recipients in parallel in order to get multiple responses to choose from. In most of these scenarios, the further processing depends on successful processing of the sub-messages. For example, we want to select the best bid from a number of vendor responses or we want to bill the client for an order after all items have been pulled from the warehouse.
This is the context. Splitter is the opposite pattern to Aggregator.

I could understand "to get multiple response to choose from". But I don't understand the concept of "sub-message".
  • How do we combine the results of individual, but related messages so that they can be processed as a whole?

But? Oh yeah.


  • Use a stateful filter, an Aggregator, to collect and store individual messages until a complete set of related messages has been received. Then, the Aggregator publishes a single message distilled from the individual messages.
  • The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is 'complete' below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.

Self-starting Aggregator

  • When an Aggregator receives a message that cannot be associate to an existing aggregate, it will create a new aggregate. Therefore, the Aggregator does not need prior knowledge of the aggregates that is may produce. Accordingly, we call this variant a Self-starting Aggregator.
  • The Aggregator may have to deal with the situation that an incoming message belongs to an aggregate that has already been closed out, i.e. after the aggregate message has been published. In order to avoid starting a new aggregate, the Aggregator needs to keep a list of aggregates that have been closed out. 
    • We (also) need to provide a mechanism to purge this list periodically so that it does not grow indefinitely.
    • This assumes that we can make some basic assumptions about the time frame in which related messages will arrive.

Control Bus

  • Allowing the Aggregator to listen on a specific control channel which allows the manual purging of all active aggregates or a specific one.
    • This feature can be useful if we want to recover from an error condition without having to restart the Aggregator component.
    • Alowing the Aggregator to publish a list of active aggregates to a special channel upon request can be a very useful debugging feature.
    • Both functions are excellent examples of the kind of features typically incorporated into a Control Bus.

Initialized Aggregator

Recipient List

Composed Message Processor

Scatter-Gather


Aggregator in Spring Integration

I just realized that I don't know how to use Aggregator.

How Does Splitter-Aggregator Work?


This article confused me. It seems that <aggregator/> doesn't need any parameter and it works! Let this be my start point.

I created a very simple integration workflow like this:

<int:channel id="input"/>
<int:chain input-channel="input">
<int:splitter expression="payload.split(',')"/>
<int:aggregator/>
<int-stream:stdout-channel-adapter append-newline="true"/>
</int:chain>

Within the test program:

input.send(MessageBuilder.withPayload("1,2,3,4").build);

And it works!

So I printed out the messages:

These were the messages after being splitted:

[Payload=1][Headers={timestamp=1369799663894, id=720bb91e-a570-42f2-b088-59acc563e284, correlationId=612a3c7c-6cc4-4d60-bb91-d007200fad4f, sequenceSize=4, sequenceNumber=1}]
[Payload=2][Headers={timestamp=1369799663896, id=b6d33028-8e40-46a7-8421-62f57cf13967, correlationId=612a3c7c-6cc4-4d60-bb91-d007200fad4f, sequenceSize=4, sequenceNumber=2}]
[Payload=3][Headers={timestamp=1369799663896, id=e91ffbe6-cb43-4191-8cea-cbb7a18053c6, correlationId=612a3c7c-6cc4-4d60-bb91-d007200fad4f, sequenceSize=4, sequenceNumber=3}]
[Payload=4][Headers={timestamp=1369799663896, id=e8001450-547a-41aa-9dd4-ae474196c011, correlationId=612a3c7c-6cc4-4d60-bb91-d007200fad4f, sequenceSize=4, sequenceNumber=4}]

And these was the message after being aggregated:

[Payload=[1, 2, 3, 4]][Headers={timestamp=1369799663897, id=c4a661ad-c32d-47e5-9cfe-2e6dcf2c4dc1, correlationId=612a3c7c-6cc4-4d60-bb91-d007200fad4f}]

  • correlationId
  • sequenceSize
  • sequenceNumber

The API for performing splitting consists of one base class, AbstractMessageSplitter, which is a MessageHandler implementation, encapsulating features which are common to splitters, such as filling in the appropriate message headers CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER on the messages that are produced. This enables tracking down the messages and the results of their processing (in a typical scenario, these headers would be copied over to the messages that are produced by the various transforming endpoints), and use them, for example, in a Composed Message Processor scenario.

5.4.2. Functionality
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send the aggregated message as output.

Implementing an Aggregator requires providing the logic to perform the aggregation (i.e., the creation of a single message from many). Two related concepts are correlation and release.

Correlation determines how messages are grouped for aggregation. In Spring Integration correlation is done by default based on the MessageHeaders.CORRELATION_ID message header. Messages with the same MessageHeaders.CORRELATION_ID will be grouped together. However, the correlation strategy may be customized to allow other ways of specifying how the messages should be grouped together by implementing a CorrelationStrategy (see below).

To determine the point at which a group of messages is ready to be processed, a ReleaseStrategy is consulted. The default release strategy for the Aggregator will release a group when all messages included in a sequence are present, based on the MessageHeaders.SEQUENCE_SIZE header. This default strategy may be overridden by providing a reference to a custom ReleaseStrategy implementation.

It is not difficult to write a very simple program to just generate messages for Aggregator's default behavior:

int maxNum = 100;

for(int m=0; m<10; ++m) {
for (int k = 0; k < maxNum; ++k) {
input.send(MessageBuilder.withPayload(String.valueOf(m + k))
.setCorrelationId(String.valueOf(m))
.setSequenceNumber(m * 100 + k).setSequenceSize(maxNum)
.build());
}

}

But what if I need to handle some more complex situation? How could I use ReleaseStrategy and CorrelationStrategy?




  • The AggregatingMessageHandler (subclass of AbstractCorrelatingMessageHandler) is a MessageHandler implementation, encapsulating the common functionalities of an Aggregator (and other correlating use cases)
    • correlating messages into a group to be aggregated
    • maintaining those messages in a MessageStore until the group can be released
    • deciding when the group can be released
    • aggregating the released group into a single message
    • recognizing and responding to an expired group

I can find all corresponding implementation for the first 4 functionalities, but the 5th one. One thing that confused me here: ! (!messageGroup.isComplete() && messageGroup.canAdd(message)).

!(!messageGroup.isComplete() && messageGroup.canAdd(message))
==> messageGroup.isComplete() || !messageGroup.canAdd(message)

What does messageGroup.isComplete() mean?
  • True if the group is complete (i.e. no more messages are expected to added)
This makes sense to me now. So if the message group is not expecting to add any new messages, then the new message will be sent to the discardChannel.

But wait, is a complete message group an expired group? How do they define an Expired Group? That was something bothered me a lot in my previous testing.

Here are some related information:

  • Setting expire-groups-upon-completion to true (default is false) removes the entire group and any new messages, with the same correlation id as the removed group, will form a new group.
  • Partial sequences can be released by using a MessageGroupStoreReaper together with send-partial-result-on-expiry being set to true.
For the first one, here is the implementation:

For the second one:
and expireGroup(correlationKey, group) is called by forceComplete(group).

From what I can see here, I think an expired group is a group that is removed from the messageStore:


The 5th functionality is "

  • recognizing and responding to an expired group

". There are three places that an expired group is recognized:
  • expire-groups-upon-completion. If this option is set, a message group will be removed when it is completed. It is easy to see whether a message group is complete, using MessageGroup.isComplete(). However, the following code confused me:
Please take a look at handleMessageInternal(xxx)
When releaseStrategy.canRelease(messageGroup) is true, a message group will be set as complete. But complete means that other message targeting to the same message group, or in another word, with the same correlationKey, will be discarded. Really? Can I find any documentation for this?

  • When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group. If the group is also complete (i.e. if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete. Any new messages for this group will be sent to the discard channel (if defined).
  • Setting expire-groups-upon-completion to true (default is false) removes the entire group and any new messages, with the same correlation id as the removed group, will form a new group.
Here it is!! But I am still confused about "if the group is also complete". I didn't see these if from AggregatingMessageHandler.

And I think this is why my simple test didn't work. Because I am trying to use the same correlation key, which is a string literal. When release strategy is satisfied, the group is complete and not expecting any other messages. All message after that will be discarded and I didn't provide a discard channel.

However, if "expire-groups-upon-completion", message group will be removed, which is so called expired. When there is new message arrive, a new group with the same correlation key will be created and begin to accept message again.

MessageGroupProcessor

The Aggregation API consists of a number of classes. And the first one is:

  • The interface MessageGroupProcessor, and its subclasses: MethodInvokingAggregatingMessageGroupProcessor and ExpressionEvaluatingMessageGroupProcessor
It looks like a Bridge Pattern.

  • AbstractCorrelatingMessageHandler bridges its message group processing implementation to a MessageGroupProcessor. Then the Message Handler part could focus on handling the messsages. It has two variants: AggregatingMessageHandler, and ResequencingMessageHandler.
  • For MessageGroupProcessor, there are three major categories of it.
Of course, you can call it a Strategy Pattern, because MessageGroupProcessor is the MessageHandler's strategy for handling MessageGroup.

Anyway, I am interested in its name: outputProcessor. Why?

This is the only place that outputProcessor is used. That means, MessageGroup is only used when the Aggregator tries to complete the group, or tries to send out the message, and outputProcessor helps the Aggregator to process the messages, and generate the result(s).

Then outputProcessor makes sense.

One thing need to note here is that the result will be sent out at line 379. I don't know why they have partialSequence here and why they return it back, but it seems that nobody use this return value the the subsequent call.

AbstractAggregatingMessageGroupProcessor

  • Here is a brief highlight of the base AbstractAggregatingMessageGroupProcessor (the responsibility of implementing the aggregatePayloads method is left to the developer)
    • aggregateHeaders(MessageGroup group)
    • aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders)

This is a typical Template Method solution. Since we usually don't want to change the aggregateHeaders() part, it is worthy to take a look at the default implementation:
  • In a word, it removes all the non-consistent headers
    • ID
    • TIMESTAMP
    • SEQUENCE_NUMBER
    • conflict header
  • But it also remove SEQUENCE_SIZE. I think it is because they believe SEQUENCE_SIZE is one only for Aggregator.



28.5.13

SI SFTP Support


  • Inbound Channel Adapter
  • Outbound Channel Adatper
  • Outbound Gateway

JSch

  • Every time an adapter requests a session object from its SessionFactory, a new SFTP session is being started.
  • Under the covers, the SFTP Session Factory relies on the JSch library to provide the SFTP capabilities.
  • Spring Integration also supports the caching of SFTP sessions

Spring's property placeholder support

  • PropertyPlaceHolderConfigurer
    • To externalize property values from a bean definition in a separate file using the standard Java Properties format.
  • <context:property-placeholder/>
    • From Spring 2.5
    • Supports a comma-separated list in the location attribute
  • System.properties
    • The PropertyPlaceholderConfigurer not only looks for properties file you specify. By default it also checks against the Java System Properties if it cannot find a property in the specified properties files.
    • systemPropertiesMode
      • never(0)
      • fallback(1)
      • override(2)
  • Class name substitution

SFTP Inbound Channel Adapter

  • Is a special listener that will connect to the server and listen for the remote directory events
    • new file created
      • will initiate a file transfer
  • <int-sftp:inbound-channel-adapter/>
  • By default that transferred file will carry the same name as the original file.
    • local-filename-generator-expression
      • SpEL
      • The root object of the SpEL Evaluation Context is the original name of the remote file (String).
  • File filtering
    • filename-pattern
    • filename-regex
    • org.springframework.integration.file.filters.FileListsFilter
  • SFTP Inbound Channel Adapter is a Polling Consumer.

27.5.13

UUID.randomUUID() is slow!

When I monitored the performance of my Spring Integration application, I found out that a low of threads were blocked, according to VisualVM. My application is simple, so I replaced the whole place that I applied with synchronized keyword and tested it again. It didn't improve anything. So I generated the jstack result. Funny enough, it seemed Spring Integration itself was blocked by some kinds of lock from the Java Fundamental Library.

  java.lang.Thread.State: BLOCKED (on object monitor)
        at java.security.SecureRandom.nextBytes(SecureRandom.java:455)
        - waiting to lock <0x00002aaac2aa3580> (a java.security.SecureRandom)
        at java.util.UUID.randomUUID(UUID.java:145)
        at org.springframework.integration.MessageHeaders.<init>(MessageHeaders.java:101)
        at org.springframework.integration.message.GenericMessage.<init>(GenericMessage.java:68)
        at org.springframework.integration.support.MessageBuilder.build(MessageBuilder.java:298)
        at org.springframework.integration.transformer.HeaderEnricher.transform(HeaderEnricher.java:119)
        at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:67)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:154)
        at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
        at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)

By using Google, I found this discussion:


Performance Test Result for the Default UUID Generator

The default UUID generator has a bad performance. But how bad it is? I tried to generate 1 billion (1024 * 1024 * 1024) UUIDs in a single thread with my Windows PC (i7 2620M), it took about 450 seconds to do so. It took about 1 millisecond to generate 2 K UUID, and 2 seconds for 2 M.

Since it uses blocked strategy, it looks bad in multithreading environment. I got the following monitoring picture when I was trying to use 10 threads to generate these 1 billion UUIDs.


It tooks 800 seconds to finish the work.

The One with Best Performance in a Single Thread but Worse Performance with Heavy Work Load in Multi-Threads

This is an implementation that I got very good performance when running in a Single thread, and maybe with a light load.

The tricky part is this:

boolean found = false;
long n = 0;
int c = 0;
  
while(!found)
{
 c = counter.get();   
 n = lastNano;
   
 if( c < MASK)
 {
  //claim the value
  //
  found = counter.compareAndSet(c, c + 1); //<1>
  continue;
 }
 else
 {
  if(counter.compareAndSet(c, 0))  //<2>
  {
   n = System.nanoTime();  
   if(n < lastNano)
    n = lastNano++; 
   else
    lastNano = n;
   
   c = 0;
   found = true;
  }
 }   
}

This is actually an IMPROVED solution, but apparently, it is not improved as much as it was expected. I believed System.nanoTime() would be slightly heavy. It took me 15 seconds to call System.nanoTime() for 1 billion times. So I thought I don't really need to call System.nanoTime() every time. For a certain amount of iterations, I just need to increase a counter with the same nanoTime, then I will get different IDs. 

However, I need to update the nano value when the counter runs out. This is a typical synchronization issue. To avoid performance issue, I tried to implement it in the none-blocking way. So the counter is designed as the synchronization guard, it claims the current stored nano value, re-check the counter to finally assure its ownership, which was implemented at <1>. If the counter reach its capacity, only one of the current thread could update the nano value, while others need to retry, which was implemented at <2>.

It doesn't look bad at all. In a single thread, it took me about 20 seconds to generate 1 billion items. However, if I starts 10 threads to heavily generate the items, a lot of retried happened at <1>, which ruined its performance. I couldn't even finished my test.

Even though I still believe this is not a very bad idea, because nobody would just starts a lot of thread and do nothing but generate UUIDs, I don't want this solution to work in my project, because System.nanoTime() is not that heavy.

A Workable Solution

This is a workable solution. In a test with Single thread, it took me 60 seconds to generate 1 billion items. Since a pure System.nanoTime() call will take 15 seconds for 1 billion times, this solution is fast. And it is 85% faster then the JDK version. When I tried it with 10 threads, it took 100 seconds to finish its work with monitoring (JVisualVM). It fully utilized my CPUs, while the default version only consumed 60% to 70% CPU utilization. No blocking issue at all.

The bad thing with this solution is, if it works in you PC, you can't barely move your mouse.

26.5.13

SpEL!!

8. Spring Expression Language (SpEL)


My lack of knowledge of SpEL pays off these days when I am trying Spring Integration in the project. So I decided to learn more about it. I need to following information about SpEL.

  • Features
  • Syntax
  • Performance
Note that the following information is some kind of note for myself. Please don't waste time on those unorganized information. Thanks.

Introduction

  • The Spring Expression Language (SpEL for short) is a powerful expression language that supports querying and manipulating an object graph at runtime.
Does it mean that SpEL is not a general purpose Expression Language? I am not familiar with any expression language either. So I can't tell what the different is here.
  • The language syntax is similar to Unified EL but offers additional features, most notably method invocation and basic string templating functionality.
    • The Java Unified Expression Language is a special purpose programming language mostly used in Java web applications for embedding expressions into web pages. The Java specification writers and expert groups of the Java web-tier technologies have worked on a unified expression language which is now part of the JSP 2.1 specification (JSR-245). 
    • The expression language started out as part of the JavaServer Pages Standard Tag Library (JSTL) and was originally called SPEL (Simplest Possible Expression Language), then just Expression Language (EL). It was a scripting language which allowed access to Java components (JavaBeans) through JSP. Since JSP 2.0, it has been used inside JSP tags to separate Java code from JSP, and to allow easier access to Java components (than in Java code).
It turns out the Unified EL is something I knew but not am not good at. 
  • Features of Unified EL
    • The new unified EL is a union of the JSP and JSF expression languages. In addition to the features already available in the JSP EL, the unified EL has the following features:
      • Deferred evaluation
      • Support for expressions that can set values and expressions that can invoke methods
      • A pluggable API for resolving expressions


It seems that Unified EL can also invoke method. SpEL says it differs from Unified EL on invoking method may be not true. But anyway, let's live with it now.

Feature Overview


The expression language supports the following functionality
  • Literal expressions
  • Boolean and relational operators
  • Regular expressions
  • Class expressions
  • Accessing properties, arrays, lists, maps
  • Method invocation
  • Relational operators
  • Assignment
  • Calling constructors
  • Bean references
  • Array construction
  • Inline lists
  • Ternary operator
  • Variables
  • User defined functions
  • Collection projection
  • Collection selection
  • Templated expressions

Expression Evaluation using Spring's Expression Interface

  • The StandardEvaluationContext is relatively expensive to construct and during repeated usage it builds up cached state that enables subsequent expression evaluations to be performed more quickly. For this reason it is better to cache and reuse them where possible, rather than construct a new one for each expression evaluation.
  • More common usage is to provide only the SpEL expression string as part of a configuration file, for example for Spring bean or Spring Web Flow definitions. In this case, the parser, evaluation context, root object and any predefined variables are all set up implicitly, requiring the user to specify nothing other than the expressions.

The EvaluationContext interface

  • The out-of-the-box implementation, StandardEvaluationContext, uses reflection to manipulate the object, caching java.lang.reflect's Method, Field, and Constructor instances for increased performance.
    • setRootObject()
    • setVariable()
    • registerFunction()
    • Register custom ConstructorResolvers, MethodResolvers, and PropertyAccessors to extend how SpEL evaluates expressions. (See JavaDoc)
  • Generics Aware!!

Expression support for defining bean definitions

  • In both cases the syntax to define the expression is of the form #{ <expression string> }.

Language Reference

Literal Expressions

  • Supported literal expressions
    • Strings
    • Dates
    • Numberic values
    • Boolean
    • null
  • Strings are delimited by single quotes
    • To put a single quote itself in a string use two single quote characters.

Properties, Arrays, Lists, Maps, Indexers

  • Case insensitivity is allowed for the first letter of property names.

Inline lists

Array construction

Methods

  • Using typical Java Programming syntax.

Operators

  • Supports the 'instanceof' and regular expression based 'matches' operator.
  • Each symbolic operator can also be specified as a purely alphabetic equivalent.
    • lt
    • gt
    • le
    • ge
    • eq
    • ne
    • div
    • mod
    • not

Logical operators

  • and
  • or
  • not

Mathematical operators

Assigment

Types

  • The special 'T' operator can be used to specify an instance of java.lang.Class (the 'type').
  • Static methods are invoked using this operator as well.
  • StandardEvaluationContext uses a TypeLocator to find types and the StandardTypeLocator (which can be replaced) is built with an understanding of the java.lang package.
    • This means T() references to types within java.lang do not need to be fully qualified, but all other type references must be.

Constructors

Variables

  • #variableName
  • setVariable()

#this and #root

  • #this is always defined and refers to the current evaluation object.
  • #root is always defined and refers to the root context object.

Functions

  • registerFunction(String name, Method m)

Bean references

  • If the evaluation context has been configured with a bean resolver it is possible to lookup beans from an expression using the (@) symbol
  • @foo

25.5.13

Technology Radar May 2013 | ThoughtWorks

Shared by one of my friends.

How To Implements Steps in Spring Integration?

Spring Integration is based on Message Driven Architecture. You have to drive everything along using messages. If some Message Endpoints, such as ServiceActivator, or Gateway, return void, the messaging flow will be interrupted.

However, what if I have multiple Service Activator needs to be driven one by one synchronously? That's a very simple requirement in any programming language, but I don't find an easy way to make it work in Spring Integration. You must have a message to drive the next Endpoint, as it is named.

Finally, I found out that I could use <int:request-handler-advice-chain> to make it work.

7.7 Adding Behavior to Endpoints

My Example Code





21.5.13

How Spring DM (1.2.1) Create Beans for OSGi Services

AbstractServiceProxyCreator

  • createServiceProxy(ServiceReference)
OsgiServiceProxyFactoryBean
  • createProxy()
Object createProxy() {
if (log.isDebugEnabled())
log.debug("Creating a single service proxy ...");

// first create the TCCL interceptor to register its listener with the
// dynamic interceptor
final ServiceProviderTCCLInterceptor tcclAdvice = new ServiceProviderTCCLInterceptor();
final OsgiServiceLifecycleListener tcclListener = tcclAdvice.new ServiceProviderTCCLListener();

final ServiceDynamicInterceptor lookupAdvice = new ServiceDynamicInterceptor(getBundleContext(),
ClassUtils.getParticularClass(getInterfaces()).getName(), getUnifiedFilter(), getAopClassLoader());

lookupAdvice.setRequiredAtStartup(getCardinality().isMandatory());

OsgiServiceLifecycleListener[] listeners = addListener(getListeners(), tcclListener);

lookupAdvice.setListeners(listeners);
synchronized (monitor) {
lookupAdvice.setRetryTimeout(retryTimeout);
retryTemplate = lookupAdvice.getRetryTemplate();
}
lookupAdvice.setApplicationEventPublisher(applicationEventPublisher);

// add the listeners as a list since it might be updated after the proxy
// has been created
lookupAdvice.setStateListeners(stateListeners);
lookupAdvice.setServiceImporter(this);
lookupAdvice.setServiceImporterName(getBeanName());

// create a proxy creator using the existing context
ServiceProxyCreator creator = new AbstractServiceProxyCreator(getInterfaces(), getAopClassLoader(),
getBeanClassLoader(), getBundleContext(), getContextClassLoader()) {

ServiceInvoker createDispatcherInterceptor(ServiceReference reference) {
return lookupAdvice;
}

Advice createServiceProviderTCCLAdvice(ServiceReference reference) {
return tcclAdvice;
}
};

ProxyPlusCallback proxyPlusCallback = creator.createServiceProxy(lookupAdvice.getServiceReference());

synchronized (monitor) {
proxy = proxyPlusCallback.proxy;
destructionCallback = new DisposableBeanRunnableAdapter(proxyPlusCallback.destructionCallback);
}

lookupAdvice.setProxy(proxy);
// start the lookup only after the proxy has been assembled
lookupAdvice.afterPropertiesSet();

return proxy;
}

20.5.13

Better Use <int:header-enricher/> instead of <int-mail:header-enricher/>


<int:header-enricher id="mailHeaderEnricher" input-channel="channel-in" output-channel="outboundMail">
<int:header name="mail_subject" expression="'Ex : ' + payload" />
<int:header name="mail_to" value="XXX@gmail.com" />
<int:header name="mail_from" value="XXX@gmail.com" />
</int:header-enricher>


GMaill Accessing Using JavaMail


<util:properties id="gmailProperties587">
<prop key="mail.smtp.host">smtp.gmail.com</prop>
<prop key="mail.smtp.auth">true</prop>
<prop key="mail.smtp.port">587</prop>
<prop key="mail.debug">true</prop>
<prop key="mail.smtp.starttls.enable">true</prop>
<prop key="mail.smtp.quitwait">false</prop>
</util:properties>

<util:properties id="gmailProperties465">
<prop key="mail.smtp.host">smtp.gmail.com</prop>
<prop key="mail.smtp.auth">true</prop>
<prop key="mail.smtp.port">465</prop>
<prop key="mail.debug">true</prop>
<prop key="mail.smtp.quitwait">false</prop>
<prop key="mail.smtp.socketFactory.port">465</prop>
<prop key="mail.smtp.socketFactory.class">javax.net.ssl.SSLSocketFactory</prop>
<prop key="mail.smtp.socketFactory.fallback">false</prop>
</util:properties>

15.5.13

Service Activator in Spring Integration

Travel of Software Developer: Understanding Service Activator

Just want to continue this topic and focus on Spring Integration's implementation.

How Spring Integration Describes its Service Activator?


  • A Service Activator is a generic endpoint for connecting a service instance to the messaging system.
That's great!
  • A generic endpoint.
  • Connecting a service instance.
  • To the messaging system.

A Generic Endpoint

As I mentioned in the previous post, Service Activator is usually abused for its generality. Does this confirm this problem here? I am not sure.
  • The input Message Channel must be configured, and if the service method to be invoked is capable of returning a value, an output Message Channel may also be provided.
    • The output channel is optional, since each Message may also provide its own "Return Address" header. This same rule applies for all consumer endpoints.
This matches the Pattern characteristics in EIP:
  • A Service Activator can be one-way (request only) or two-way (Request-Reply).
  • The service can be as simple as a method call - synchronous and non-remote - perhaps part of a Service Layer.

More Information about Service Activator in Spring Integration

Implementation of Service Activator in Spring Integration


This is the Factory Bean used to create the Service Activator bean. The implementation of Service Activator's Factory Bean is very much different from the way GatewayProxyFactoryBean, but I don't know why.
GatewayProxyFactoryBean comes all the way from IntegrationObjectSupport, AbstractEndpoint, and AbstractPollingEndpoint, which makes sense for a Messaging Endpoint. While Service Activator is also a Messaging Endpoint, it derives from AbstractSimpleMessageHandlerFactoryBean<H>, then AbstractStandardMessageHandlerFactoryBean. It also makes from the Factory Bean's point of view. And the real handler for Service Activator is: ServiceActivatingHandler:

Although Service Activator can also use SpEL:
  • Since Spring Integration 2.0, Service Activators can also benefit from SpEL.
I am going to focus only on ServiceActivatingHandler now.

ServiceActivatingHandler

@Override
protected Object handleRequestMessage(Message<?> message) {
try {
return this.processor.processMessage(message);
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new MessageHandlingException(message, "failure occurred in Service Activator '" + this + "'", e);
}
}

This is how ServiceActivatingHandler is implemented. But wait, I don't see Asynchronous!! And MessageProcessor<T> sounds new to me now. Let's take a look at how this MessageProcessor is constructed.

MessageProcessor<T> and MethodInvokingMessageProcessor<T>

@Override
MessageHandler createMethodInvokingHandler(Object targetObject, String targetMethodName) {
ServiceActivatingHandler handler = (StringUtils.hasText(targetMethodName))
? new ServiceActivatingHandler(targetObject, targetMethodName)
: new ServiceActivatingHandler(targetObject);
return this.configureHandler(handler);
}

So, this is as simple as a constructor.

public ServiceActivatingHandler(final Object object) {
this(new MethodInvokingMessageProcessor<Object>(object, ServiceActivator.class));
}

And the essential part is MethodInvokingMessageProcessor<Object>, while in Gateway, this is MethodInvocationGateway extends MessagingGatewaySupport.

private final MessagingMethodInvokerHelper<T> delegate;

public T processMessage(Message<?> message) {
try {
return delegate.process(message);
}
catch (Exception e) {
throw new MessageHandlingException(message, e);
}
}

MessagingMethodInvokerHelper<T>, The Delegate

I don't want to look into this class right now, because I believe it just try to find the method and call the right method, the valid basic method reflection. If I find I am wrong later, I will come back.

@Async and Annotation Driven Task Executor

  • To enable both @Scheduled and @Async annotations, simply include the 'annotation-driven' element from the task namespace in your configuration.
This is the way to support Asynchronous Service Activator with Direct Channel.

Implementation of Service Activator with @Async 


Message Return from ServiceActivator with @Async

In the above example, I used a one-way ServiceActivator. This is a very natural way for using Service Activator. You can inject the successive channel into ServiceActivator, so that ServiceActivator could send out messages or receive messages flexibly. I think this is the so-called Half-sync/Half-Async way, not sure though.

However, sometimes we might want to get some output from ServiceActivator. In another word, we want a two-way ServiceActivator. You won't have problem with Synchronous Service Activator, but you will with @Async. If you do it in a normal way, such as:

@Async
@ServiceActivator
public Message<Object> handleMessage(Message<String> msg)

You won't get anything as its output. Why?

AnnotationAsyncExecutionInterceptor.invoke(MethodInvocation)

public Object invoke(final MethodInvocation invocation) throws Throwable {
Future<?> result = this.determineAsyncExecutor(invocation.getMethod()).submit(
new Callable<Object>() {
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (Throwable ex) {
ReflectionUtils.rethrowException(ex);
}
return null;
}
});
if (Future.class.isAssignableFrom(invocation.getMethod().getReturnType())) {
return result;
}
else {
return null;
}
}

This is the place @Async is handled. And you see, only if the methods return type is an instance of Future<T>, the result will be kept. Otherwise, the result will simply be discarded.

@Async
@ServiceActivator
public Future<Message<Object>> handleMessage(Message<String> msg) throws Exception {
return new AsyncResult<Message<Object>>(XXXX);
}

For a complete example, please see this.

But how can Service Activator pass down the return to successive handlers? I don't think there is an easy solution. And that's maybe why you won't see any document from Spring Integration telling you to use @Async for Service Activator.

Service Activator with QueueChannel




Asynchronous Gateway and Parallel DirectChannel

Parallel DirectChannel

DirectChannel could work in a Parallel way:

<int:channel id="channel-in">
<int:dispatcher load-balancer="round-robin" task-executor="taskExec"/>
</int:channel>
However, a channel with a dispatcher doesn't necessary be handled in parallel. In the above example, if the Gateway expects some return, DirectChannel won't returned immediately after execute the job in another executor. Although the execution is driven into another thread, but the current thread will be blocked for the reply:

MessagingTemplate:

private <S, R> Message<R> doSendAndReceive(MessageChannel channel, Message<S> requestMessage) {
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel replyChannel = new TemporaryReplyChannel(this.receiveTimeout);
requestMessage = MessageBuilder.fromMessage(requestMessage)
.setReplyChannel(replyChannel)
.setErrorChannel(replyChannel)
.build();
this.doSend(channel, requestMessage);
Message<R> reply = this.doReceive(replyChannel);
if (reply != null) {
reply = MessageBuilder.fromMessage(reply)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build();
}
return reply;
}

The doReceived(...) method will blocked the caller's thread until it receives the reply from the replyChannel, which technically makes the work in sync.
If you really want to have some reply from the DirectChannel while you want to have it work in parallel, you need Asynchronous Gateway or something similar.

Asynchronous Gateway

You can of course use the default asynchronous executor, which is AsyncTaskExecutor. But the default executor will create as many threads as it can to run the Gateway. Sometimes it will drain the resources of your system without proper control. So you might want to manually set up an executor with some control.




Revert Change in Eclipse/Git

http://stackoverflow.com/questions/6788881/undo-single-file-local-uncommitted-change-in-egit-e-g-svn-revert

Google Code, Git, and Eclipse

Create a new Project in Google Code

  • Open any projects.
  • From "My favorites" drop-down menu, you will see "Create a project...", click on it.
  • Fill in all Options except Project labels. Choose Git as the Version Control System.
  • Click on "Create project" button.

Find Access Information

  • Open "Source" tab.
  • etc. etc. etc.

Clone a Repository

  • Run this command
git clone https://<your user name>@code.google.com/p/simple-demo-set/ 

  • Create a RADME.txt under the repository.
  • git add README.txt
  • git commit -m "Add the first file"
  • create $HOME/.netrc (I'm using cygwin)
machine code.google.com
login XXXX
password GOOGLEGENERATED
  • git push -u origin master

Add Git Repository into Eclipse

  • Open Git Repositories Exploring Perspective
  • Click on "Add an existing local Git Repository to this view"
  • Select the designated directory location and clikc on Finish.

Share Project from Eclipse

  • Choose the repository instead of create a local one.

Create .gitignore

It seems that Eclipse's global ignorance settings don't work for Git. You have to create your own .gitignore for the git repository. These are the entry you probably want to add.

.classpath
.project
.settings
target


14.5.13

Storm?





  • Whereas Hadoop targets batch processing, Storm is an always-active service that receives and processes unbound streams of data.
    • Like Hadoop, Storm is a distributed system that offers massive scalability for applications that store and manipulate big data. 
    • Unlike Hadoop, it delivers that data instantaneously, in realtime.
  • It is written primarily in Clojure and supports Java by default.
  • Use cases
    • Realtime analytics
    • Online machine learning
    • Continuous computation
    • Distributed RPC
    • ETL
  • How does storm differ from Hadoop?
    • The simple answer is that Storm analyzes realtime data while Hadoop analyze offline data.
    • In truth, the two frameworks complement one another more than they compete.
  • Hadoop
    • Provides its own file system (HDFS)
    • Manages both data and code/tasks.
    • It divides data into blocks and when a "job" executes, it pushes analysis code close to the data it is analyzing.
      • This is how Hadoop avoids the overhead of network communication in loading data -- keeping the analysis code next to the data enables Hadoop to read it faster by orders of magnitude.
    • MapReduce
      • Hadoop partitions data into chunks and passes those chunks to mappers that map keys to values.
      • Reducers then assemble those mapped key/value pairs into a usable output.
      • The MapReduce paradigm operates quite elegantly but is targeted at data analysis.
    • HDFS
      • In order to leverage all the power of Hadoop application data must be stored in the HDFS file system.
  • Storm
    • Storm solves a different problem altogether.
    • Realtime
      • meaning right now
      • Storm is interested in understanding things that are happening in realtime, and interpreting them.
    • File System
      • Storm does not have its own file system.
    • Programming Paradigm
      • Its programming paradigm is quite a bit different from Hadoop's.
      • Storm is all about obtaining chunks of data, known as spouts, from somewhere and passing that data through various processing components, known as bolts.
      • Storm's data processing mechanism is extremely fast and is meant to help you identify live trends as they are happening.
      • Unlike Hadoop, Storm doesn't care what happened yesterday or last week.
  • Architecture
    • At the highest level, Storm is comprised of topologies.
      • A topology is a graph of computations
        • Each node contains processing logic and each path between nodes indicates how data should be passed between nodes.
    • In side of topologies you have networks of streams, which are unbounded sequences of tuples.
      • Storm provides a mechanism to transform streams into new streams using spouts and bolts.
      • Spouts
        • Spouts generate streams, which can pull data from a site like Twitter of Facebook and then publish it in an abstract format.
      • Bolts
        • Bolts consume input streams, process them, and then optionally generate new streams.
    • Tuples
      • Storm's data model is represented by tuples.
        • A tuples is a named list of values of any type.
        • Storm supports all primititve types, Strings, and byte-arrays and you can build your own serializer if you want to use your own object types.
      • Your spouts will "emit" tuples 
      • And your bolts will consume them.
      • Your bolts may also emit tuples if their output is destined to be processed by another bolt downstream.
      • Basically, emitting tuples is the mechanism for passing data from a spout to a bolt, or from a bolt to another bolt.
This is quite a normal architecture for expandable or scalable computatal system, or network. And basically, the name, Topologies, the graph, makes a lot of sense.

It's an Oriented Graph. If we add a root node above Spouts, it would look like a tree but with shared children. It reminds me the Traveling Saleman Problem (TSP, Travelling salesman problem - Wikipedia, the free encyclopedia).
  • Storm Cluster
    • A Storm Cluster is somewhat similar to Hadoop clusters, but while a Hadoop cluster runs map-reduce jobs, Storm runs topologies.
      • Map-reduce jobs eventually end.
      • Topologies are destined to run until you explicitly kill them.
    • Storm clusters define two types of nodes
      • Master Node
        • This node runs a daemon process called Nimbus.
        • Nimbus is responsible for distributing code across the cluster, assigning tasks to machines, and monitoring the success and failure of units of work.
      • Worker Nodes
        • These nodes run a daemon process called the Supervisor.
        • A Supervisor is responsible for listening for work assignments for its machine.
        • It then subsequently starts and stops worker processes. Each worker process executes a subset of a topology, so that the execution of a topology is spread across a multitude of worker processes running on a multitude of machines.
This is a typical cluster architecture. I used to designed a system with the Master Node named MC (MultiController), and the Worker Nodes named LC (LocalController). MC takes care of job distribution. But MC doesn't monitor the success or failure of units of work. It monitors the LC's health and relocate the Work Node to ensure High Availability.

My system was not designed for dynamic expansion. There are only three fixed layers. LC, the Worker Node runs as a daemon and supervises a list of Engines. Those Engines are created dynamically by some configuration package, the work assignments if you will. It starts and stops these Engines, the Worker Processes if you will. Those Worker Processes work basically independently, which made my cluster simpler then this. I think having each worker process executes a subset of a topology, and coordinates those topology will be a challenge.

In my cluster, each Worker Nodes takes care of the success or failure of its own Worker Processes.
  • ZooKeeper
    • Sitting between the Nimbus and the various Supervisors is ZooKeeper.
    • ZooKeeper's goal is to enable highly reliable distributed coordination, mainly by acting as a centralized service for distributed cluster functionality.
    • Storm topologies are deployed to the Nimbus and then the Nimbus deploys spouts and bolts to Supervisors.
      • When it comes time to execute spouts and bolts, the Nimbus communicates with the Supervisors by passing messages to ZooKeepers.
      • Zookeepers maintain all state for the topology, which allows the Nimbus and Supervisors to be fail-fast and stateless: if the Nimbus or Supervisor processes go down then the state of processing is not lost;
      • If the Nimbus or Supervisor processes go down then the state of processing is not lost;
      • Work is reassigned to another Supervisor and processing continues.
      • Then, when the Nimbus or a Supervisor is restarted, they simply rejoin the cluster and their capacity is added to the cluster.
My first question would be: how many Nimbus in the system? It seems only one. If this is true, we need somewhere to eliminate this single-failure-point (Single point of failure - Wikipedia, the free encyclopedia). So I think Zookeeper actions as an independent configuration storage out of Nimbus. Nimbus delegates Zookeepers to deployed Supervisors and manage them. Zookeepers are clustered and do not have single point of failure problem.

Of course, this is what I guess so far.

  • Storm Development Environment
    • Local Mode:
      • Storm executes topologies completely in-process by simulating worker nodes using threads.
    • Distributed Mode:
      • In distributed mode, it runs across a cluster of machines.