瀏覽代碼

Code cleanup.

tags/1.0.2
Yuk-Ming Lam 4 年之前
父節點
當前提交
0b260d7e58
共有 2 個檔案被更改,包括 7 行新增126 行删除
  1. +1
    -36
      src/main/java/altk/comm/engine/Broadcast.java
  2. +6
    -90
      src/main/java/altk/comm/engine/Postback.java

+ 1
- 36
src/main/java/altk/comm/engine/Broadcast.java 查看文件

@@ -76,14 +76,10 @@ public abstract class Broadcast
private int jobsTotal; private int jobsTotal;
private int scheduledJobs; private int scheduledJobs;
protected int transactions;
protected int completedTransactions;

public static enum BroadcastState public static enum BroadcastState
{ {
ACCEPTED, ACCEPTED,
RUNNING, RUNNING,
// ALLDONE,
PAUSING, PAUSING,
PAUSED, PAUSED,
CANCELING, CANCELING,
@@ -187,7 +183,6 @@ public abstract class Broadcast
BroadcastState.PURGED, // User action BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.ABORTED, // Service provider irrecoverable error
BroadcastState.EXPIRED BroadcastState.EXPIRED
// BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues.
)); ));
// Transitions from CANCELING // Transitions from CANCELING
@@ -211,10 +206,6 @@ public abstract class Broadcast
BroadcastState.CANCELING, // User action BroadcastState.CANCELING, // User action
BroadcastState.PURGED // User action BroadcastState.PURGED // User action
)); ));
// Transitions from ALLDONE
//toStates.put(BroadcastState.ALLDONE, Arrays.asList(
// BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue
// ));
} }
public static class StateChangeResult public static class StateChangeResult
@@ -352,7 +343,6 @@ public abstract class Broadcast
// Service the jobs // Service the jobs
try try
{ {
incrementTransactions(batch.size());
processJobs(batch, serviceProviderPeer, prerequisites); processJobs(batch, serviceProviderPeer, prerequisites);
} }
catch (EngineException e) catch (EngineException e)
@@ -410,15 +400,6 @@ public abstract class Broadcast
{ {
return readyQueue.size() + scheduledJobs; return readyQueue.size() + scheduledJobs;
} }
public void incrementTransactions(int size) {
transactions += size;
}
public void incrementCompletedTransactions(int size)
{
completedTransactions += size;
}
/** /**
* Experimental formulation where it takes over directing * Experimental formulation where it takes over directing
@@ -1135,7 +1116,6 @@ public abstract class Broadcast
// No more rescheduling on cancel, expire, or pause // No more rescheduling on cancel, expire, or pause
completedJobCount++; completedJobCount++;
logJobCount("Completed a job"); logJobCount("Completed a job");
//if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE);
} }
else if (rescheduleTimeMS == 0) else if (rescheduleTimeMS == 0)
{ {
@@ -1196,8 +1176,7 @@ public abstract class Broadcast
case PAUSED: case PAUSED:
return pendingJobs(); return pendingJobs();
default: default:
return 0;
return 0;
} }
} }


@@ -1210,18 +1189,4 @@ public abstract class Broadcast
return broadcastType; return broadcastType;
} }
public boolean allDone() {
return (completedJobCount == jobsTotal);
}
/*
public boolean stopOnEmptyPostQueue()
{
if (state == state.ALLDONE) return true;
// Add the ALLDONE message to postQueue
setState(BroadcastState.ALLDONE);
// Do not stop as one report has just been added by the previous setState().
return false;
}
*/
} }

+ 6
- 90
src/main/java/altk/comm/engine/Postback.java 查看文件

@@ -43,9 +43,6 @@ import org.apache.log4j.Logger;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Node; import org.w3c.dom.Node;


import altk.comm.engine.Broadcast.BroadcastState;
import altk.comm.engine.CommonLogger;



/** /**
* Queues JobReports to be posted back to attribute postBackURL. * Queues JobReports to be posted back to attribute postBackURL.
@@ -124,85 +121,12 @@ public class Postback
List<String> reportList = new ArrayList<String>(); List<String> reportList = new ArrayList<String>();
synchronized(postQueue) synchronized(postQueue)
{ {
// Each iteration examines the queue for a batch to send
//for (;;)
//{
//reportList = new ArrayList<String>();
for (int i = 0; i < maxBatchSize ; i++)
{
report = postQueue.poll();
if (report == null) break;
reportList.add(report);
}
/*
if (reportList.size() > 0)
{
myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size()));
postQueue.notifyAll();
break; // break out to do the work.
}TODO Auto-generated catch block
// No reports
BroadcastState toState = null;
String reason = null;
switch (broadcast.getState())
{
case ALLDONE:
toState = BroadcastState.COMPLETED;
reason = "All posted";
break;
case CANCELING:
if (broadcast.getActiveJobCount() > 0) break;
toState = BroadcastState.CANCELED;
reason = "User canceled";
break;
case PAUSING:
if (broadcast.getActiveJobCount() > 0) break;
toState = BroadcastState.PAUSED;
reason = "User paused";
break;
default:
}
if (toState != null)
{
if (toState.isFinal)
{
broadcast.terminate(toState, reason);
myLogger.info("All posted, thread terminating");
return;
}
else
{
broadcast.setState(toState);
return;
}
}
if (broadcast.getState().isFinal)
{
// No more. Notify all waiting postback threads and exit thread
myLogger.info("All posted, thread terminating");
postQueue.notifyAll();
return;
}
// Nothing to do, so wait a while, and look at the
// queue again.

try
{
myLogger.debug("Going to wait " + QUEUE_WAIT * 1000);
postQueue.wait(); //QUEUE_WAIT * 1000);
}
catch (InterruptedException e)
{
CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e);
break;
}
CommonLogger.health.info("Surfacing from wait");
System.out.println(getName() + " surfacing from wait");
continue;
*/
// }
for (int i = 0; i < maxBatchSize ; i++)
{
report = postQueue.poll();
if (report == null) break;
reportList.add(report);
}
} // synchronized() } // synchronized()
if (reportList.size() > 0) if (reportList.size() > 0)
{ {
@@ -213,14 +137,6 @@ public class Postback
break; break;
case SERVER_IO_ERROR: case SERVER_IO_ERROR:
/* Should not requeue report for this may lead to dead lock on this queu.
// TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log.
// Re-queue these reports
for (String rpt : reportList)
{
queueReport(rpt);
}
*/
// Sleep for a while before retrying this PostBack server. // Sleep for a while before retrying this PostBack server.
CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds");
try try


Loading…
取消
儲存