这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on May 17, 2021. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;

import org.openhab.io.transport.cul.CULCommunicationException;
import org.openhab.io.transport.cul.CULDeviceException;
Expand All @@ -35,7 +35,7 @@ public abstract class AbstractCULHandler<T extends CULConfig> implements CULHand
/**
* Thread which sends all queued commands to the CUL.
* The Thread waits on a CUL response before sending a new
* command to prevent race conditions.
* command to prevent race conditions.
*
* @author Till Klocke
* @since 1.4.0
Expand All @@ -44,55 +44,68 @@ public abstract class AbstractCULHandler<T extends CULConfig> implements CULHand
private class SendThread extends Thread implements CULListener {

private final Logger logger = LoggerFactory.getLogger(SendThread.class);

private Boolean waitOnCULResponse = false;

/**
* List of commands the CULfw does not response to and we shall not wait for
*/
private final static String async_cmds = "F";

/**
* Timeout in milliseconds the thread will wait for an response by the CUL
*/
private final static int waitForResponse_ms = 2000;

@Override
public void run() {
int waitTimeout = 0;
String command = null;

while (!isInterrupted()) {
if (!waitOnCULResponse) {
String command = sendQueue.poll();
if (command != null) {
if (!command.endsWith("\r\n")) {
command = command + "\r\n";
}
try {
logger.trace("Writing message: {}", command);

writeMessage(command);
waitOnCULResponse = true;
waitTimeout = 0;
} catch (CULCommunicationException e) {
logger.warn("Error while writing command to CUL", e);
}
}
}
try {
Thread.sleep(10);
command = sendQueue.take();
} catch (InterruptedException e) {
logger.debug("Error while sleeping in SendThread", e);
logger.warn("Failed to wait for queue: " + e.toString());
}
if (command == null) {
continue;
}
if (!command.endsWith("\r\n")) {
command = command + "\r\n";
}
try {
logger.trace("Writing message: {}", command);

waitTimeout += 1;
if (waitOnCULResponse && waitTimeout > 200) {
logger.trace("Reset wait on CUL response due to timeout");;
waitOnCULResponse = false;
writeMessage(command);
if (async_cmds.contains(command.subSequence(0, 1))) {
continue;
}
long start_ms = System.nanoTime();
waitOnCulResponse();
logger.trace("Response took {} ms", (System.nanoTime() - start_ms) / 1000000);
} catch (CULCommunicationException e) {
logger.warn("Error while writing command to CUL", e);
}
}
logger.warn("Sending thread interrupted");
}

private synchronized void waitOnCulResponse() {
try {
wait(waitForResponse_ms);
} catch (InterruptedException e) {
logger.debug("Error while sleeping in SendThread", e);
}
}

@Override
public void dataReceived(String data) {
public synchronized void dataReceived(String data) {
logger.trace("CUL response received: {}", data);
waitOnCULResponse = false;
notify();
}

@Override
public void error(Exception e) {
logger.trace("CUL error received: {}", e);
waitOnCULResponse = false;
public synchronized void error(Exception e) {
logger.debug("CUL error received: {}", e);
notify();
}
}

Expand Down Expand Up @@ -132,7 +145,7 @@ public void run() {

protected List<CULListener> listeners = new ArrayList<CULListener>();

protected Queue<String> sendQueue = new ConcurrentLinkedQueue<String>();
protected BlockingQueue<String> sendQueue = new LinkedTransferQueue<String>();
protected int credit10ms = 0;

protected AbstractCULHandler(T config) {
Expand Down Expand Up @@ -161,7 +174,7 @@ public boolean hasListeners() {
@Override
public void open() throws CULDeviceException {
openHardware();

registerListener(sendThread);
sendThread.start();
}
Expand All @@ -170,7 +183,7 @@ public void open() throws CULDeviceException {
public void close() {
sendThread.interrupt();
unregisterListener(sendThread);

closeHardware();
}

Expand All @@ -191,14 +204,18 @@ public void close() {
@Override
public void send(String command) {
if (isMessageAllowed(command)) {
sendQueue.add(command);
requestCreditReport();
if (sendQueue.offer(command)) {
requestCreditReport();
} else {
log.warn("Send buffer overrun. Doing reset");
sendQueue.clear();
}
}
}

@Override
public void sendWithoutCheck(String message) throws CULCommunicationException {
sendQueue.add(message);
sendQueue.offer(message);
}

/**
Expand Down Expand Up @@ -284,7 +301,7 @@ public int getCredit10ms() {
*/
private void requestCreditReport() {
/* this requests a report which provides credit10ms */
log.debug("Requesting credit report");
log.trace("Requesting credit report");
try {
sendWithoutCheck("X\r\n");
} catch (CULCommunicationException e) {
Expand Down