Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add logic to get jobs #23

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 74 additions & 66 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,111 +223,119 @@ export class AppModule {

Register schedule module.

| field | type | required | description |
| --- | --- | --- | --- |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.logger | LoggerService \| boolean | false | custom schedule logger, default is console |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| field | type | required | description |
| -------------------- | ------------------------ | -------- | -------------------------------------------------------------------------------- |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.logger | LoggerService \| boolean | false | custom schedule logger, default is console |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |

### class Schedule

#### scheduleCronJob(key: string, cron: string, callback: JobCallback, config?: ICronJobConfig)

Schedule a cron job.

| field | type | required | description |
| --- | --- | --- | --- |
| key | string | true | The unique job key |
| cron | string | true | The cron expression |
| callback | () => Promise<boolean> | boolean | If return true in callback function, the schedule will cancel this job immediately |
| config.startTime | Date | false | The start time of this job |
| config.endTime | Date | false | The end time of this job |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |
| field | type | required | description |
| -------------------- | ---------------------------- | -------- | ---------------------------------------------------------------------------------- |
| key | string | true | The unique job key |
| cron | string | true | The cron expression |
| callback | () => Promise<boolean> | boolean | If return true in callback function, the schedule will cancel this job immediately |
| config.startTime | Date | false | The start time of this job |
| config.endTime | Date | false | The end time of this job |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |

#### scheduleIntervalJob(key: string, interval: number, callback: JobCallback, config?: IJobConfig)

Schedule a interval job.

| field | type | required | description |
| --- | --- | --- | --- |
| key | string | true | The unique job key |
| interval | number | true | milliseconds |
| callback | () => Promise<boolean> | boolean | If return true in callback function, the schedule will cancel this job immediately |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |
| field | type | required | description |
| -------------------- | ---------------------------- | -------- | ---------------------------------------------------------------------------------- |
| key | string | true | The unique job key |
| interval | number | true | milliseconds |
| callback | () => Promise<boolean> | boolean | If return true in callback function, the schedule will cancel this job immediately |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |

#### scheduleTimeoutJob(key: string, timeout: number, callback: JobCallback, config?: IJobConfig)

Schedule a timeout job.

| field | type | required | description |
| --- | --- | --- | --- |
| key | string | true | The unique job key |
| timeout | number | true | milliseconds |
| callback | () => Promise<boolean> | boolean | If return true in callback function, the schedule will cancel this job immediately |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.immediate | boolean | false | running job immediately |
| field | type | required | description |
| -------------------- | ---------------------------- | -------- | ---------------------------------------------------------------------------------- |
| key | string | true | The unique job key |
| timeout | number | true | milliseconds |
| callback | () => Promise<boolean> | boolean | If return true in callback function, the schedule will cancel this job immediately |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.immediate | boolean | false | running job immediately |

#### cancelJob(key: string)

Cancel job.

#### getJobIds(): string[]

Get list of all job ids

#### getJobById(id: string):IJob

Get job by id


## Decorators

### Cron(expression: string, config?: ICronJobConfig): MethodDecorator

Schedule a cron job.

| field | type | required | description |
| --- | --- | --- | --- |
| expression | string | true | the cron expression |
| config.key | string | false | The unique job key |
| config.startTime | Date | false | the job's start time |
| config.endTime | Date | false | the job's end time |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |
| field | type | required | description |
| -------------------- | ------- | -------- | -------------------------------------------------------------------------------- |
| expression | string | true | the cron expression |
| config.key | string | false | The unique job key |
| config.startTime | Date | false | the job's start time |
| config.endTime | Date | false | the job's end time |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |

### Interval(milliseconds: number, config?: IJobConfig): MethodDecorator

Schedule a interval job.

| field | type | required | description |
| --- | --- | --- | --- |
| milliseconds | number | true | milliseconds |
| config.key | string | false | The unique job key |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |
| field | type | required | description |
| -------------------- | ------- | -------- | -------------------------------------------------------------------------------- |
| milliseconds | number | true | milliseconds |
| config.key | string | false | The unique job key |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.waiting | boolean | false | the scheduler will not schedule job when this job is running, if waiting is true |
| config.immediate | boolean | false | running job immediately |

### Timeout(milliseconds: number, config?: IJobConfig): MethodDecorator

Schedule a timeout job.

| field | type | required | description |
| --- | --- | --- | --- |
| milliseconds | number | true | milliseconds |
| config.key | string | false | The unique job key |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.immediate | boolean | false | running job immediately |
| field | type | required | description |
| -------------------- | ------- | -------- | ----------------------------------------------------- |
| milliseconds | number | true | milliseconds |
| config.key | string | false | The unique job key |
| config.enable | boolean | false | default is true, when false, the job will not execute |
| config.maxRetry | number | false | the max retry count, default is -1 not retry |
| config.retryInterval | number | false | the retry interval, default is 5000 |
| config.immediate | boolean | false | running job immediately |

### InjectSchedule(): PropertyDecorator

Expand Down
10 changes: 6 additions & 4 deletions lib/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ export class Executor {
async execute(
jobKey: string,
callback: () => Promise<Stop> | Stop,
tryLock: Promise<TryLock> | TryLock,
args: any[],
tryLock?: Promise<TryLock> | TryLock,
): Promise<Stop> {
let release;
if (typeof tryLock === 'function') {
Expand All @@ -32,7 +33,7 @@ export class Executor {
}
}

const result = await this.run(jobKey, callback);
const result = await this.run(jobKey, callback, args);

try {
typeof release === 'function' ? release() : void 0;
Expand All @@ -49,10 +50,11 @@ export class Executor {

private async run(
jobKey: string,
callback: () => Promise<Stop> | Stop,
callback: (...args: any[]) => Promise<Stop> | Stop,
args: any[],
): Promise<Stop> {
try {
const result = await callback();
const result = await callback(...args);
this.clear();
return result;
} catch (e) {
Expand Down
29 changes: 26 additions & 3 deletions lib/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,48 @@ export class Schedule {
cron: string,
callback: JobCallback,
config?: ICronJobConfig,
tryLock?: Promise<TryLock> | TryLock,
Copy link
Owner

@miaowing miaowing May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank your for your pull request.
Dynamic job is not support distribution now, because the job is not sync to other nodes in cluster, tryLock function in this has no effect. Maybe I will resolve it in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it still ok for some cases. In my project, user can upload csv files. My app (which is running in multiple instances) will try to pick up the csv file to process data. In this case, I have to make sure that the mutex lock is good enough to not let other instances pick up the same file.

) {
this.scheduler.scheduleCronJob(key, cron, callback, config);
this.scheduler.scheduleCronJob(key, cron, callback, config, tryLock);
}

public scheduleIntervalJob(
key: string,
interval: number,
callback: JobCallback,
config?: IJobConfig,
tryLock?: Promise<TryLock> | TryLock,
) {
this.scheduler.scheduleIntervalJob(key, interval, callback, config);
this.scheduler.scheduleIntervalJob(
key,
interval,
callback,
config,
tryLock,
);
}

public scheduleTimeoutJob(
key: string,
timeout: number,
callback: JobCallback,
config?: IJobConfig,
tryLock?: Promise<TryLock> | TryLock,
) {
this.scheduler.scheduleTimeoutJob(key, timeout, callback, config);
this.scheduler.scheduleTimeoutJob(key, timeout, callback, config, tryLock);
}

/**
* Get all registered jobs
*/
public getJobIds() {
return this.scheduler.getJobIds();
}

/**
* Get jobs by ids
*/
public getJobById(id: string) {
return this.scheduler.getJobById(id);
}
}
39 changes: 35 additions & 4 deletions lib/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ export class Scheduler {
}
}

private static parseDate(inp: any): Date {
const ret = new Date(inp);
if (Object.prototype.toString.call(ret) === '[object Date]') {
// it is a date
if (isNaN(ret.getTime())) {
// d.valueOf() could also work
return undefined;
} else {
return ret;
}
} else {
return undefined;
}
}

public static scheduleCronJob(
key: string,
cron: string,
Expand All @@ -80,13 +95,15 @@ export class Scheduler {
tryLock?: Promise<TryLock> | TryLock,
) {
const configs = Object.assign({}, defaults, config);
const startTime = this.parseDate(config && config.startTime);
const endTime = this.parseDate(config && config.endTime);
const instance = schedule.scheduleJob(
{
start: config.startTime,
end: config.endTime,
start: startTime,
end: endTime,
rule: cron,
},
async () => {
async (...args: any[]) => {
const job = this.jobs.get(key);
if (configs.waiting && job.status !== READY) {
return false;
Expand All @@ -99,7 +116,7 @@ export class Scheduler {
);

job.status = READY;
const needStop = await executor.execute(key, cb, tryLock);
const needStop = await executor.execute(key, cb, args, tryLock);
if (needStop) {
this.cancelJob(key);
}
Expand Down Expand Up @@ -206,4 +223,18 @@ export class Scheduler {
this.cancelJob(key);
}
}

/**
* Get all registered jobs
*/
public static getJobIds() {
return [...this.jobs.keys()];
}

/**
* Get jobs by ids
*/
public static getJobById(id: string) {
return this.jobs.has(id) ? this.jobs.get(id) : undefined;
}
}