Class QueueWireTapInput
- java.lang.Object
-
- com.swiftmq.impl.streams.comp.io.QueueWireTapInput
-
- All Implemented Interfaces:
DestinationInput
,Input
,WireTapSubscriber
public class QueueWireTapInput extends java.lang.Object implements DestinationInput, WireTapSubscriber
Creates or re-uses a named WireTap on a Queue and adds a subscriber to it.Messages are sent to the WireTap when the Message is inserted into the Queue. If the WireTap has multiple subscribers (that is, Streams are using the same WireTap name), Messages are distributed evenly over the subscribers in a round-robin fashion.
Per default, a deep copy of a Message is performed before it is transferred to a subscriber. So it is save to change it. If the Message will not be changed by a subscriber, the deep copy can be disabled. But be careful! All changes of a Message will have side effects.
Message transfer to a WireTap subscriber is done outside of a transaction. The Subscriber uses a BlockingQueue to receive the Messages. The BlockingQueue has a bufferSize which is 10 Messages per default. If the size is reached, the Queue waits until there is free space. The time to wait is specified in maxBlockTime with a default of 500 ms. If the timeout is reached, the Message will not be inserted, so a WireTap subscriber may not receive all Messages transferred through the Queue. For that reason, a WireTap is a good solution if Message lost can be tolerated, e.g. in statistic scenarios.
- Author:
- IIT Software GmbH, Muenster/Germany, (c) 2017, All Rights Reserved
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description QueueWireTapInput
bufferSize(int bufferSize)
Sets the bufferSize of the internal BlockingQueue.void
close()
Closes this Input.void
collect(long interval)
Internal use.Message
current()
Returns the current Message of this Input.Input
current(Message current)
Sets the current Message on this Input.java.lang.String
destinationName()
Returns the destinationNameDestinationInput
destinationName(java.lang.String destinationName)
Sets the destinationName if different from the name used in stream.create().input(name)void
executeCallback()
Internal use.QueueMessageProcessor
getMessageProcessor()
Internal use.java.lang.String
getName()
Returns the name of this Input.java.lang.String
getSelector()
Internal use.boolean
isSelected(MessageImpl message)
Internal use.QueueWireTapInput
maxBlockTime(long maxBlockTime)
Sets the maximum block time when the internal BlockingQueue is full.Message
next()
Internal use.DestinationInput
onInput(InputCallback callback)
Sets the onInput callback.void
putMessage(MessageImpl message)
Internal use.boolean
requieresDeepCopy()
Returns whether Message inserts requires a deep copy (default is true).QueueWireTapInput
requiresDeepCopy(boolean requiresDeepCopy)
Sets whether the Message inserts requires a deep copy in order to change it later on.DestinationInput
selector(java.lang.String selector)
Sets the JMS Message Selectorvoid
setMessageProcessor(QueueMessageProcessor messageProcessor)
Internal use.void
start()
Starts this Input.java.lang.String
toString()
-
-
-
Method Detail
-
getMessageProcessor
public QueueMessageProcessor getMessageProcessor()
Description copied from interface:DestinationInput
Internal use.- Specified by:
getMessageProcessor
in interfaceDestinationInput
-
destinationName
public DestinationInput destinationName(java.lang.String destinationName)
Description copied from interface:DestinationInput
Sets the destinationName if different from the name used in stream.create().input(name)- Specified by:
destinationName
in interfaceDestinationInput
- Parameters:
destinationName
- destinationName- Returns:
- DestinationInput
-
destinationName
public java.lang.String destinationName()
Description copied from interface:DestinationInput
Returns the destinationName- Specified by:
destinationName
in interfaceDestinationInput
- Returns:
- destinationName
-
bufferSize
public QueueWireTapInput bufferSize(int bufferSize)
Sets the bufferSize of the internal BlockingQueue. Default is 10 Messages.- Parameters:
bufferSize
- Buffer Size- Returns:
- this
-
maxBlockTime
public QueueWireTapInput maxBlockTime(long maxBlockTime)
Sets the maximum block time when the internal BlockingQueue is full. Default is 500 ms. If the time is reached, the Message will not be inserted.- Parameters:
maxBlockTime
- Max Block Time- Returns:
- this
-
requieresDeepCopy
public boolean requieresDeepCopy()
Returns whether Message inserts requires a deep copy (default is true).- Specified by:
requieresDeepCopy
in interfaceWireTapSubscriber
- Returns:
- deepCopy flag
-
requiresDeepCopy
public QueueWireTapInput requiresDeepCopy(boolean requiresDeepCopy)
Sets whether the Message inserts requires a deep copy in order to change it later on. Default is true.- Parameters:
requiresDeepCopy
- deepCopy flag- Returns:
- this
-
isSelected
public boolean isSelected(MessageImpl message)
Internal use.- Specified by:
isSelected
in interfaceWireTapSubscriber
-
putMessage
public void putMessage(MessageImpl message)
Internal use.- Specified by:
putMessage
in interfaceWireTapSubscriber
-
next
public Message next()
Internal use.
-
setMessageProcessor
public void setMessageProcessor(QueueMessageProcessor messageProcessor)
Description copied from interface:DestinationInput
Internal use.- Specified by:
setMessageProcessor
in interfaceDestinationInput
-
getName
public java.lang.String getName()
Description copied from interface:Input
Returns the name of this Input.
-
getSelector
public java.lang.String getSelector()
Internal use.- Specified by:
getSelector
in interfaceDestinationInput
- Returns:
- JMS Message Selector
-
selector
public DestinationInput selector(java.lang.String selector)
Description copied from interface:DestinationInput
Sets the JMS Message Selector- Specified by:
selector
in interfaceDestinationInput
- Parameters:
selector
- JMS Message Selector- Returns:
- DestinationInput
-
current
public Input current(Message current)
Description copied from interface:Input
Sets the current Message on this Input.
-
current
public Message current()
Description copied from interface:Input
Returns the current Message of this Input.
-
onInput
public DestinationInput onInput(InputCallback callback)
Description copied from interface:DestinationInput
Sets the onInput callback.- Specified by:
onInput
in interfaceDestinationInput
- Parameters:
callback
- callback- Returns:
- DestinationInput
-
executeCallback
public void executeCallback() throws java.lang.Exception
Description copied from interface:Input
Internal use.- Specified by:
executeCallback
in interfaceInput
- Throws:
java.lang.Exception
-
collect
public void collect(long interval)
Description copied from interface:Input
Internal use.
-
start
public void start() throws java.lang.Exception
Description copied from interface:Input
Starts this Input. This method is called automatically if an Input is created outside a callback. If it is created inside, it must be called explicitly.
-
close
public void close()
Description copied from interface:Input
Closes this Input.
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
-