Syntax of JavaFBP (Java Implementation of FBP)
and Component API

Network Definitions

In my book, "Flow-Based Programming", I describe the syntax of the network specifications of various FBP dialects that were in existence when the book was written. JavaFBP, the Java implementation of the FBP concepts, did not exist at that time, so this web page has been added describing the syntax of JavaFBP network definitions.

In JavaFBP we not only code components in Java but also define the networks as Java programs. The source code is on SourceForge under Subversion (SVN) for the Flow-Based Programmming project: SVN for FBP .

There is also a jar file - JavaFBP jar file - on the FBP web site.

One advantage of defining the network as executable code, as compared with other approaches that merely list connections in a language-independent way, is that the network can contain additional logic. This logic then controls the way the network is defined, rather than the way it runs. Some may regard this as a defect, rather than as an asset, and both views can certainly be defended, but one of the neat things it enables us to do is to adjust multiplexing levels of various structures in the diagram using a table of values (remember the multiplexing example in Sample DrawFlow Diagram). One merely retrieves a value from a table for the degree of multiplexing in a particular structure in the diagram, and this value is then used both as the index of a loop invoking the connect statement, and as the index for the elements of an array-type port (see below for both of these terms).

Since the way the syntax relates to the underlying diagram may not be all that clear, a brief description is in order.  At the end of this page, I have given an extremely simple JavaFBP component.

Any JavaFBP network definition starts as follows:


public class xxxxxx extends Network {

protected void define() {


where xxxxxx is the Network name, including the usual imports, copyright statements, etc. Of course you will have to import classes for JavaFBP objects, such as Network and Component, as well as any JavaFBP "verb" classes you may be using.

The network definition is terminated with:

}
public static void main(String[] argv) throws Exception {
new xxxxxx().go();
}
}

In between the beginning and the ending statements defining the network, you specify a list of connections, using the following methods, which I will refer to as "clauses":

Every component instance must have a unique character string identifying it, which allows other component instances or initial information packets (IIPs) to be attached to it via a connection.

The following method call: 

component("xxxx")

returns a reference to a component instance. The first reference to this particular component instance must specify the component class to be executed by that occurrence. This is done by coding

component("xxxx", cccc.class)

where cccc  is the name of the Java module to be executed.

Similarly, a port is identified by a port clause, e.g. port("xxxx").

A port may be an array-type port, in which case the port clauses referring to its elements have index values, as follows: 

port("xxxx",n)

where "n" runs up monotonically from 0. Each element of the port array will be connected to a different component occurrence or IIP.

A connect or initialize clause may contain the relevant component clauses, together with their corresponding port clauses, embedded within it, as e.g.

     connect(component("Read", ReadText.class),
         port("OUT"),
         component("Splitter1", Splitter1.class),
         port("IN"));

or the connect and component portions may be in separate statements, provided component precedes any connects that reference it.

A connect contains:

Optionally a connect may have a fifth parameter: the connection capacity, specified as an int.  If this is omitted, the default value is used: 1 for testing, or 10 for production (this currently has to be changed by hand in the Network.class). 

If an asterisk (*) is specified for the "from" port, this is called an "automatic output port", and indicates a signal generated when the "from" component instance terminates (actually the port is just closed, so no packet has to be disposed of). 

If an asterisk is specified for the "to" port, this is called an "automatic input port", and indicates a delay - the "to" component instance does not start until a signal or a close is received at this port.

If *SUBEND is specified as a port name on a subnet, a packet containing null is emitted at this port every time the subnet deactivates, i.e. all the contained components terminate.  It doesn't have to be named in the port metadata.  This null packet is emitted for all activations, including the last one.

An initialize clause contains:

as e.g.

     initialize(new FileReader(
"c:\\com\\jpmorrsn\\eb2engine\\test\\data\\myXML3.txt"),
       component("Read"),
           port("SOURCE"));

However, it has been recommended that IIPs should be strings, rather than arbitrary objects, to facilitate future graphical management of networks.

One last point: any number of "from" ports can be connected to a single "to" port; only one "to" port can ever be connected to a given "from" port.

Sample Network

Let us code up a network implementing the following picture:

 

First list the component clauses, together with the component classes they are to execute (assuming that component classes have been written to execute the various nodes of the diagram), e.g.:

 
component("Read Masters",Read.class)
component("Read Details",Read.class)
component("Collate",Collate.class)
component("Process Merged Stream",Proc.class)
component("Write New Masters",Write.class)
component("Summary & Errors",Report.class)

Now these component clauses may either be made into separate statements or they can be imbedded into the connect statements that follow.  Here are the connections in the diagram, without imbedded component clauses:

  connect(component("Read Masters"),port("OUT"),component("Collate"),
port("IN",0));    // array port
connect(component("Read Details"),port("OUT"),component("Collate"),
port("IN",1));      // array port
connect(component("Collate"),port("OUT"),
component("Process Merged Stream"), port("IN"));
connect(component("Process Merged Stream"),port("OUTM"),
component("Write New Masters"),port("IN"));
connect(component("Process Merged Stream"),port("OUTSE"),
component("Summary & Errors"),port("IN"));

Each item in this list is a separate Java statement.

We can now add the class designation to the first component clause referencing a particular component occurrence, giving the following:

  
connect(component("Read Masters",Read.class),port("OUT"),
    component("Collate",Collate.class), port("IN",0)); // array port
  connect(component("Read Details",Read.class),port("OUT"),
     component("Collate"),port("IN",1)); // array port
  connect(component("Collate"),port("OUT"),
component("Process Merged Stream",Proc.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTM"),
component("Write New Masters",Write.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTSE"),
component("Summary & Errors",Report.class),port("IN"));

Now "Read Masters" and "Read Details" use the same Java class, so we need some way to indicate the name of the file that each is going to read. This is done using Initial Information Packets (IIPs). In this case they might usefully specify FileReader objects, so we need to add two initialize clauses, as follows:

  
initialize(new FileReader("c:\\mastfile"),
component("Read Masters"),
port("SOURCE"));
initialize(new FileReader("c:\\detlfile"),
component("Read Details"),
port("SOURCE"));

Note that, since both "Read" component occurrences use the same class code, they naturally have the same port names - of course, the ports are attached to different IIPs.

Remember that back-slashes have to be doubled in Java character strings!

"Write New Masters" will have to have an IIP to specify the output destination - perhaps:

  
initialize(new FileWriter("c:\\newmast"),
component("Write New Masters"),
port("DESTINATION"));

Note also that this IIP is not a destination for the Writer - it is an object used by this component occurrence so that the latter can figure out where to send data to.

Add the beginning and ending statements, and you're done!   The actual sequence of connect and initialize statements is irrelevant.

Here is the final result:

  
public class xxxxxx extends Network {

protected void define() {
connect(component("Read Masters",Read.class),port("OUT"),
component("Collate",Collate.class),port("IN",0)); // array port
connect(component("Read Details",Read.class),port("OUT"),
component("Collate"),port("IN",1));// array port
connect(component("Collate"),port("OUT"),
component("Process Merged Stream",Proc.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTM"),
component("Write New Masters",Write.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTSE"),
component("Summary & Errors",Report.class),port("IN"));
initialize(new FileReader("c:\\mastfile"),
component("Read Masters"),
port("SOURCE"));
initialize(new FileReader("c:\\detlfile"),
component("Read Details"),
port("SOURCE"));
initialize(new FileWriter("c:\\newmast"),
component("Write New Masters"),
port("DESTINATION"));

}

public static void main(String[] argv) throws Exception{
// as of JavaFBP-2.4
new xxxxxx().go();
}
}



Alternative (Simplified) Notation (JavaFBP-2.0+)

In the latest release of JavaFBP, we have introduced a new, simplified notation, in addition to that shown above.  In this notation connect specifies two character strings, and initialize specifies an object and a character string.   In both cases, the second character string specifies a combination of component and port, with the two parts separated by a period. Array port indices, if required, are specified using square brackets, e.g.

"component.port[3]"

The old port notation will still be supported, but is only really needed when the port index is a variable.  When debugging, it will be noted that the square bracket notation is used in trace lines, even when it was not used in the network definition.

Component names must of course not include periods or most special characters, but they may include blanks, numerals, hyphens and underscores, and they must be associated with their implementing class using a (preceding) component statement.

Here is the above network using the new notation:

 
public class xxxxxx extends Network {

protected void define() {
component("Read Masters",Read.class);
component("Read Details",Read.class);
component("Collate",Collate.class);
component("Process Merged Stream",Proc.class);
component("Write New Masters",Write.class);
component("Summary & Errors",Report.class);
connect("Read Masters.OUT", "Collate.IN[0]");
connect("Read Details.OUT", "Collate.IN[1]");
connect("Collate.OUT"), "Process Merged Stream.IN");
connect("Process Merged Stream.OUTM", "Write New Masters.IN");
connect("Process Merged Stream.OUTSE", "Summary & Errors.IN");
initialize(new FileReader("c:\\mastfile"), "Read Masters.SOURCE");
initialize(new FileReader("c:\\detlfile"), "Read Details.SOURCE");
initialize(new FileWriter("c:\\newmast"),
"Write New Masters.DESTINATION");
}


public static void main(String[] argv) throws Exception {
 // as of JavaFBP-2.4
new xxxxxx().go();
}
}

Here is a network example showing how variable port numbers can be used with the LoadBalance function to define an (admittedly fairly trivial) self-balancing network.  This also shows a slightly different way of specifying the define function.


public class TestLoadBalancer {

public static void main(final String[] args) {
try {
new Network() {
@Override
protected void define() {
int multiplex_factor = 10;
component("generate", Generate.class);
component("display", WriteToConsole.class);
component("lbal", LoadBalance.class);
connect("generate.OUT", "lbal.IN");
initialize("100 ", component("generate"), port("COUNT"));
for (int i = 0; i < multiplex_factor; i++) {
connect(component("lbal"), port("OUT", i),
component("passthru" + i, Passthru.class), port("IN"));
connect(component("passthru" + i), port("OUT"), "display.IN");
}
}
}.go();
} catch (Exception e) {
System.err.println("Error:");
e.printStackTrace();
}
}
}

Sample JavaFBP Component

A more complete description of the API is given in the next section.

This component generates a stream of 'n' IPs, where 'n' is specified in an InitializationConnection (specified by an initialize clause in the foregoing). Each IP just contains an arbitrary string of characters, in order to illustrate the concept.  Of course any copyright information included is up to the developer.

package com.jpmorrsn.fbp.components;

import com.jpmorrsn.fbp.engine.*;


/** Component to generate a stream of 'n' packets, where 'n' is
* specified in an InitializationConnection.
*/

@OutPort(value = "OUT", description = "Generated stream",
type = String.class)
@ComponentDescription(
"Generates stream of packets under control of a counter")
@InPort(value = "COUNT",
description = "Count of packets to be generated",
type = String.class)

public class Generate extends Component {

static final String copyright = "Copyright .....";



OutputPort outport;

InputPort count;



@Override
protected void execute() {
Packet ctp = count.receive();
if (ctp == null) {
return;
}
count.close();

String cti = (String) ctp.getContent();
cti = cti.trim();
int ct = 0;
try {
ct = Integer.parseInt(cti);
} catch (NumberFormatException e) {
e.printStackTrace();
}
drop(ctp);

for (int i = 0; i < ct; i++) {
int j = ct - i;
Integer j2 = new Integer(j);
String s = j2.toString();
if (j < 10) {
s = "0" + s;
}
if (j < 100) {
s = "0" + s;
}
s = s + "abc";

Packet p = create(s);
outport.send(p);

}

}

/* As of JavaFBP-2.3, the information in introspect() is now covered
by the metadata, which has at the same time been expanded to add
descriptive information.

introspect() is no longer needed - it will be ignored if present.

public Object[] introspect() { // was 'private' - must be 'public'
return new Object[] {
"generates a set of Packets under control of a counter" ,
"OUT", "output", String.class,
"lines read",
"COUNT", "parameter", Integer.class,
"Count of number of entities to be generated"};
}

*/
@Override

protected void openPorts() {

outport = openOutput("OUT");
count = openInput("COUNT");

}
}

The scheduling rules for most FBP implementations are described in the chapter of my book called Scheduling Rules.

The previous Java implementation of FBP (javaFBP-1.5.3) presents an IIP to a component once per activation. This has been changed as of JavaFBP-2.0 to once per invocation.  In practice this will only affect "non-loopers" (components that get reactivated multiple times).  

There are a few minor changes to component code as of JavaFBP-2.0:

and a major change - metadata, as shown in the above component.  This works as follows:

A new service has been added for component code as of JavaFBP-2.2:

To support isConnected, a new metadata attribute called optional has been added to @OutPort, e.g.


JavaFBP Component API

Component Metadata:

Note: when "value" is the only parameter, the "short" form
(see above) can be used

@ComponentDescription
parameters:
- value (String)

@InPort
parameters:
- value (String)
- arrayPort (boolean)
- description (String)
- type (class)

@OutPort
parameters:
- value (String)
- arrayPort (boolean)
- description (String)
- type (class)
- optional (boolean)

@InPorts
parameter: list of @InPort references, e.g. @InPorts( { @InPort("IN"),
@InPort("TEST") })

@OutPorts
parameter: list of @OutPort references, e.g. @OutPorts( { @OutPort("ACC"),
@OutPort("REJ") })


@MustRun

@Priority(Thread.MAX_PRIORITY) // default is NORM_PRIORITY


Packet class:

/**
* A Packet may either contain an Object, when type is NORMAL,
* or a String, when type is not NORMAL. The latter case
* is used for things like open and close brackets (where the
* String will be the name of a group. e.g. accounts)
**/

Object getAttribute(String key); /* key accesses a specific attribute */
Object getContent(); /* returns null if type <> NORMAL */


Component class:

/**
* All verbs must extend this class, defining its two abstract methods:
* openPorts, and execute.
**/


Packet p = create(Object o);


Packet p = create(Packet.type (int) t, String s);

drop(Packet p); // Note this change!

longWaitStart(double interval); // in seconds
longWaitEnd();



/** 3 stack methods - as of JavaFBP-2.3
**/

push (Packet p);

Packet p = pop(); // return null if empty

int stackSize();


InputPort interface:

Packet = receive();

void close();


OutputPort class:

void send(Packet packet);

boolean isConnected(); // as of 2.2

void close();