发布于 

James实现自定义安全风控

方式一

注意,该方式只能处理外站发进来的邮件,站内收发不会出发触发

代码

james->protocols->smtp->core->fastfail->RejectionHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.apache.james.protocols.smtp.core.fastfail;

import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
import org.apache.james.protocols.smtp.SMTPSession;
import org.apache.james.protocols.smtp.hook.HookResult;
import org.apache.james.protocols.smtp.hook.MailHook;
import org.apache.james.protocols.smtp.hook.RcptHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* * @author xen
* * @date 2025/3/5
**/
public class RejectionHandler implements MailHook, RcptHook {

private static final Logger LOGGER = LoggerFactory.getLogger(RejectionHandler.class);

@Override
public HookResult doMail(SMTPSession session, MaybeSender sender) {
LOGGER.info("sender:{} 进入自定义拦截逻辑-doMail", sender.asString());
if (sender.asOptional().isPresent() && isRejectedDomain(sender.asString())) {
LOGGER.info("sender:{} 拒收-doMail", sender.asString());
return HookResult.DENY;
}
LOGGER.info("sender:{} 放行-doMail", sender.asString());
return HookResult.DECLINED;
}

@Override
public HookResult doRcpt(SMTPSession session, MaybeSender sender, MailAddress recipient) {
LOGGER.info("sender:{} 进入自定义拦截逻辑-doRcpt", sender.asString());
if (sender.asOptional().isPresent() && isRejectedDomain(sender.asString())) {
LOGGER.info("sender:{} 拒收-doRcpt", sender.asString());
return HookResult.DENY;
}
LOGGER.info("sender:{} 放行-doRcpt", sender.asString());
return HookResult.DECLINED;
}

private boolean isRejectedDomain(String sender) {
return sender.endsWith("@gmail.com") || sender.endsWith("@xx.com");
}
}

修改配置文件

conf/smtpserver.xml,在相应的 元素中添加该处理器的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
<smtpservers>
<smtpserver enabled="true">
<jmxName>smtpserver-global</jmxName>
<bind>0.0.0.0:25</bind>
<!-- 其他配置项 -->
<handlerchain>
<!-- 添加 RejectionHandler,尽量放在最前面 -->
<handler class="org.apache.james.protocols.smtp.core.fastfail.RejectionHandler"/>
<!-- 其他处理器 -->
</handlerchain>
</smtpserver>
<!-- 其他 smtpserver 配置 -->
</smtpservers>
1
<handler class="org.apache.james.protocols.smtp.core.fastfail.RejectionHandler"/>

方式二

修改配置文件

服务器conf目录下的mailetcontainer.xml文件

1
在<processor state="transport" enableJmx="true">标签中添加下面的代码

收发件都触发

1
<mailet match="All" class="org.apache.james.mailetcontainer.impl.matchers.RejectionMailet" />

只在收件触发

1
<mailet match="RecipientIsLocal" class="org.apache.james.mailetcontainer.impl.matchers.ReceiveEmailMailet" />

限制邮箱域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailetcontainer.impl.matchers;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* * @author xen
* * @date 2025/3/5
**/
public class RejectionMailet extends GenericMailet {

public static final Logger LOGGER = LoggerFactory.getLogger(RejectionMailet.class);

private final List<String> rejectedDomains = Arrays.asList("xx.xx", "163.com");

@Override
public void service(Mail mail) throws MessagingException {
List<String> from = Arrays.stream(mail.getMessage().getFrom()).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());
LOGGER.info("发件人:{} 进入 Mailet 自定义拦截逻辑-service", from);
if (checkFormDomain(from)) {
LOGGER.info("发件人:{} 拒绝-service", from);
mail.setState(Mail.GHOST); // 阻止进一步处理
sendRejectionNotification(mail, from);
} else {
LOGGER.info("发件人:{} 放行-service", from);
}
}

private void sendRejectionNotification(Mail originalMail, List<String> recipients) {
try {
LOGGER.info("开始发拒收通知");
MimeMessage notification = new MimeMessage(originalMail.getMessage().getSession());
// 设置发件人为 postmaster,避免被拦截
notification.setFrom(new InternetAddress("postmaster@xx.xx"));
notification.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(String.join(",", recipients)));
notification.setSubject("邮件被拒收通知");
notification.setText("您的邮件因发件人域名被限制而无法送达。");
getMailetContext().sendMail(notification);
} catch (Exception e) {
LOGGER.error("发送拒收通知失败", e);
}
}

private Boolean checkFormDomain(List<String> from) {
if (rejectedDomains.isEmpty()) {
return false;
}
String regex = rejectedDomains.stream()
.filter(Objects::nonNull)
.map(domain -> Pattern.quote(domain.toLowerCase()))
.collect(Collectors.joining("|", "@(", ")$"));
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
return from.stream()
.filter(Objects::nonNull)
.anyMatch(email -> {
String lowerEmail = email.toLowerCase();
// 匹配拒绝域名且排除特定邮箱
return pattern.matcher(lowerEmail).find()
&& !lowerEmail.equals("postmaster@xx.xx");
});
}

}

限制邮箱域和邮箱地址和指定ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailetcontainer.impl.matchers;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;


import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;

/**
* * @author xen
* * @date 2025/3/5
**/
public class RejectionMailet extends GenericMailet {

public static final Logger LOGGER = LoggerFactory.getLogger(RejectionMailet.class);

private final List<String> rejectedDomains = Arrays.asList("", "163.com");

private final List<String> rejectedFromEmailAddress = Arrays.asList("614164@qq.com", "");

private final List<String> rejectedToEmailAddress = Arrays.asList("", "1@xx.xx");

private final List<String> rejectedSenderIPs = Arrays.asList("209.85.166.196", "203.0.113.5");


@Override
public void service(Mail mail) throws MessagingException {
List<String> from = Arrays.stream(mail.getMessage().getFrom()).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());

// 获取收件人地址
List<String> to = Arrays.stream(mail.getMessage().getRecipients(Message.RecipientType.TO))
.map(m -> ((InternetAddress) m).getAddress())
.collect(Collectors.toList());

LOGGER.info("发件人:{} 进入自定义拦截逻辑", from);
String rejectionReason = getRejectionReason(from, to, mail.getRemoteAddr());
if (!Strings.isNullOrEmpty(rejectionReason)) {
mail.setState(Mail.GHOST); // 阻止进一步处理
sendRejectionNotification(mail, from, rejectionReason);
}
}

private String getRejectionReason(List<String> from, List<String> to, String senderIP) {
if (checkFormDomain(from)) {
return "Your mail cannot be delivered due to domain restrictions";
} else if (checkFormEmailAddress(from)) {
return "Your email address is restricted and cannot be delivered";
} else if (checkToEmailAddress(to)) {
return "The recipient's email address is restricted and cannot be delivered";
} else if (checkSenderIP(senderIP)) {
return "Your IP address is restricted and cannot send emails to this server";
}
return "";
}

private void sendRejectionNotification(Mail originalMail, List<String> recipients, String content) {
try {
LOGGER.info("开始发拒收通知");
MimeMessage notification = new MimeMessage(originalMail.getMessage().getSession());
// 设置发件人为 postmaster,避免被拦截
notification.setFrom(new InternetAddress("postmaster@xx.xx"));
notification.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(String.join(",", recipients)));
notification.setSubject("Mail rejected notification");
notification.setText(content);
getMailetContext().sendMail(notification);
} catch (Exception e) {
LOGGER.error("发送拒收通知失败", e);
}
}

private Boolean checkFormDomain(List<String> from) {
if (rejectedDomains.isEmpty()) {
return false;
}
String regex = rejectedDomains.stream()
.filter(Objects::nonNull)
.map(domain -> Pattern.quote(domain.toLowerCase()))
.collect(Collectors.joining("|", "@(", ")$"));
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
return from.stream()
.filter(Objects::nonNull)
.anyMatch(email -> {
String lowerEmail = email.toLowerCase();
boolean b = pattern.matcher(lowerEmail).find()
&& !lowerEmail.equals("postmaster@xx.xx");
if (b) {
LOGGER.info("发件人:{} 域被拒绝", from);
}
return b;
});
}


private Boolean checkFormEmailAddress(List<String> from) {
if (rejectedFromEmailAddress.isEmpty()) {
return false;
}
boolean b = from.stream()
.anyMatch(email -> rejectedFromEmailAddress.contains(email) && !"postmaster@xx.xx".equals(email));
if (b) {
LOGGER.info("发件人:{} 邮箱地址被拒绝", from);
}
return b;
}

private Boolean checkToEmailAddress(List<String> to) {
if (rejectedToEmailAddress.isEmpty()) {
return false;
}

LOGGER.info("收件人地址:{}", to);

boolean b = to.stream()
.anyMatch(email -> rejectedToEmailAddress.contains(email) && !"postmaster@xx.xx".equals(email));
if (b) {
LOGGER.info("收件人:{} 邮箱地址被拒绝", to);
}
return b;
}

private Boolean checkSenderIP(String senderIP) {
if (rejectedSenderIPs.isEmpty()) {
return false;
}
LOGGER.info("发件人IP地址:{}", senderIP);

boolean b = rejectedSenderIPs.contains(senderIP);
if (b) {
LOGGER.info("发件人IP:{} 被拒绝", senderIP);
}
return b;
}
}

拓展一

在配置文件中取数据

配置文件修改

mailetcontainer.xml

1
2
3
4
5
6
<mailet match="RecipientIsLocal" class="org.apache.james.mailetcontainer.impl.matchers.RejectionMailet">
<rejectedDomains>example.com,163.com</rejectedDomains>
<rejectedFromEmailAddress>xx@qq.com,someone@example.com</rejectedFromEmailAddress>
<rejectedToEmailAddress>1@xx.xx,another@example.com</rejectedToEmailAddress>
<rejectedSenderIPs>209.85.166.xx,203.0.113.x</rejectedSenderIPs>
</mailet>

代码调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailetcontainer.impl.matchers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;


import org.apache.mailet.Mail;
import org.apache.mailet.MailetConfig;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;

/**
* * @author xen
* * @date 2025/3/5
**/
public class RejectionMailet extends GenericMailet {

public static final Logger LOGGER = LoggerFactory.getLogger(RejectionMailet.class);

private volatile List<String> rejectedDomains = new ArrayList<>();
private volatile List<String> rejectedFromEmailAddress = new ArrayList<>();
private volatile List<String> rejectedToEmailAddress = new ArrayList<>();
private volatile List<String> rejectedSenderIPs = new ArrayList<>();

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

@Override
public void init() throws MessagingException {
super.init();
loadConfiguration();
scheduler.scheduleAtFixedRate(this::loadConfiguration, 30, 30, TimeUnit.SECONDS);
}

private void loadConfiguration() {
MailetConfig config = getMailetConfig();
rejectedDomains = parseConfig(config.getInitParameter("rejectedDomains"));
rejectedFromEmailAddress = parseConfig(config.getInitParameter("rejectedFromEmailAddress"));
rejectedToEmailAddress = parseConfig(config.getInitParameter("rejectedToEmailAddress"));
rejectedSenderIPs = parseConfig(config.getInitParameter("rejectedSenderIPs"));

LOGGER.info("配置已重新加载: rejectedDomains={}, rejectedFromEmailAddress={}, rejectedToEmailAddress={}, rejectedSenderIPs={}",
rejectedDomains, rejectedFromEmailAddress, rejectedToEmailAddress, rejectedSenderIPs);
}

private List<String> parseConfig(String configValue) {
if (Strings.isNullOrEmpty(configValue)) {
return Collections.emptyList();
}
return Arrays.stream(configValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

@Override
public void destroy() {
scheduler.shutdown();
super.destroy();
}

@Override
public void service(Mail mail) throws MessagingException {
List<String> from = Arrays.stream(mail.getMessage().getFrom()).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());

// 获取收件人地址
List<String> to = Arrays.stream(mail.getMessage().getRecipients(Message.RecipientType.TO))
.map(m -> ((InternetAddress) m).getAddress())
.collect(Collectors.toList());

LOGGER.info("发件人:{} 进入自定义拦截逻辑", from);
String rejectionReason = getRejectionReason(from, to, mail.getRemoteAddr());
if (!Strings.isNullOrEmpty(rejectionReason)) {
mail.setState(Mail.GHOST); // 阻止进一步处理
sendRejectionNotification(mail, from, rejectionReason);
}
}

private String getRejectionReason(List<String> from, List<String> to, String senderIP) {
if (checkFormDomain(from)) {
return "Your mail cannot be delivered due to domain restrictions";
} else if (checkFormEmailAddress(from)) {
return "Your email address is restricted and cannot be delivered";
} else if (checkToEmailAddress(to)) {
return "The recipient's email address is restricted and cannot be delivered";
} else if (checkSenderIP(senderIP)) {
return "Your IP address is restricted and cannot send emails to this server";
}
return "";
}

private void sendRejectionNotification(Mail originalMail, List<String> recipients, String content) {
try {
LOGGER.info("开始发拒收通知");
MimeMessage notification = new MimeMessage(originalMail.getMessage().getSession());
// 设置发件人为 postmaster,避免被拦截
notification.setFrom(new InternetAddress("postmaster@xx.xx"));
notification.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(String.join(",", recipients)));
notification.setSubject("Mail rejected notification");
notification.setText(content);
getMailetContext().sendMail(notification);
} catch (Exception e) {
LOGGER.error("发送拒收通知失败", e);
}
}

private Boolean checkFormDomain(List<String> from) {
if (rejectedDomains.isEmpty()) {
return false;
}
String regex = rejectedDomains.stream()
.filter(Objects::nonNull)
.map(domain -> Pattern.quote(domain.toLowerCase()))
.collect(Collectors.joining("|", "@(", ")$"));
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
return from.stream()
.filter(Objects::nonNull)
.anyMatch(email -> {
String lowerEmail = email.toLowerCase();
boolean b = pattern.matcher(lowerEmail).find()
&& !lowerEmail.equals("postmaster@xx.xx");
if (b) {
LOGGER.info("发件人:{} 域被拒绝", from);
}
return b;
});
}


private Boolean checkFormEmailAddress(List<String> from) {
if (rejectedFromEmailAddress.isEmpty()) {
return false;
}
boolean b = from.stream()
.anyMatch(email -> rejectedFromEmailAddress.contains(email) && !"postmaster@xx.xx".equals(email));
if (b) {
LOGGER.info("发件人:{} 邮箱地址被拒绝", from);
}
return b;
}

private Boolean checkToEmailAddress(List<String> to) {
if (rejectedToEmailAddress.isEmpty()) {
return false;
}

LOGGER.info("收件人地址:{}", to);

boolean b = to.stream()
.anyMatch(email -> rejectedToEmailAddress.contains(email) && !"postmaster@xx.xx".equals(email));
if (b) {
LOGGER.info("收件人:{} 邮箱地址被拒绝", to);
}
return b;
}

private Boolean checkSenderIP(String senderIP) {
if (rejectedSenderIPs.isEmpty()) {
return false;
}
LOGGER.info("发件人IP地址:{}", senderIP);

boolean b = rejectedSenderIPs.contains(senderIP);
if (b) {
LOGGER.info("发件人IP:{} 被拒绝", senderIP);
}
return b;
}
}

拓展二

在配置文件中取数据+动态限制ip频次

配置文件修改

mailetcontainer.xml

1
2
3
4
5
6
7
8
9
<mailet match="RecipientIsLocal" class="org.apache.james.mailetcontainer.impl.matchers.RejectionMailet">
<rejectedDomains>example.com,163.com</rejectedDomains>
<rejectedFromEmailAddress>614164@qq.com,someone@example.com</rejectedFromEmailAddress>
<rejectedToEmailAddress>1@xx.xx,another@example.com</rejectedToEmailAddress>
<rejectedSenderIPs>209.85.166.196,203.0.113.5</rejectedSenderIPs>
<ipLimitTime>60000</ipLimitTime> <!-- 锁定时间,单位:秒 -->
<ipRiskControlTime>3600000</ipRiskControlTime> <!-- 风控时长,单位:秒 -->
<ipFrequency>100</ipFrequency> <!-- 风控时长内的最大请求次数 -->
</mailet>

代码调整

因为用的James代码,不想改动太多,直接在内存中操作,新增2个工具类,目录结构

1
2
3
4
5
matchers
-RejectionMailet
-utils
-CacheUtil
-ExpireMap
CacheUtil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package org.apache.james.mailetcontainer.impl.matchers.utils;

/**
* * @author xen
* * @date 2025/3/5
**/
public class CacheUtil {

/**
* 缓存一些临时数据,类似于Redis操作
*/
private static ExpireMap<String, Object> expireMap = new ExpireMap<>();

public static void put(String key, Object val, int expire) {
expireMap.put(key, val, expire);
}

public static Object get(String key) {
return expireMap.get(key);
}

public static void rem(String key) {
expireMap.remove(key);
}
}
ExpireMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package org.apache.james.mailetcontainer.impl.matchers.utils;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* * @author xen
* * @date 2025/3/5
**/
public class ExpireMap<K, V> extends ConcurrentHashMap<K, V> {

public static final Logger LOGGER = LoggerFactory.getLogger(ExpireMap.class);

private static long DEFAULT_EXPIRE_SECONDS = 2 * 60;

private ScheduledExecutorService executorService;

/**
* record cached data
*/
private Map<K, Long> expireDic = new ConcurrentHashMap<>();

Map<K, Long> getExpireDic() {
return this.expireDic;
}

public ExpireMap() {
super(1 << 4);
executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("expiremap-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new CleanTask(this), 0, 60 * 60, TimeUnit.SECONDS);
}

public ExpireMap(int initialCapacity) {
this(initialCapacity, 1, DEFAULT_EXPIRE_SECONDS);
}

public ExpireMap(long period, long expireSeconds) {
this(1 << 4, period, expireSeconds);
}

public ExpireMap(int initialCapacity, long period, long expireSeconds) {
super(initialCapacity);
DEFAULT_EXPIRE_SECONDS = expireSeconds;
executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("expiremap-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new CleanTask(this), 0, period, TimeUnit.SECONDS);
}

public ExpireMap(int delay, int period) {
super(1 << 4);
executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("expiremap-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new CleanTask(this), delay, period, TimeUnit.SECONDS);
}

@Override
public synchronized V put(K key, V value) {
expireDic.put(key, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(DEFAULT_EXPIRE_SECONDS));
return super.put(key, value);
}

@Override
public boolean containsKey(Object key) {
return !checkExpire(key) && super.containsKey(key);
}

public synchronized V put(K key, V value, long seconds) {
expireDic.put(key, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(seconds));
return super.put(key, value);
}

public synchronized V put(K key, V value, long duration, TimeUnit unit) {
expireDic.put(key, System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(duration, unit));
return super.put(key, value);
}

@Override
public boolean isEmpty() {
return entrySet().size() == 0;
}

@Override
public boolean containsValue(Object value) {
if (value == null) {
return Boolean.FALSE;
}
Set<Entry<K, V>> set = super.entrySet();
Iterator<Entry<K, V>> iterator = set.iterator();
while (iterator.hasNext()) {
Entry<K, V> entry = iterator.next();
if (value.equals(entry.getValue())) {
if (checkExpire(entry.getKey())) {
iterator.remove();
return Boolean.FALSE;
} else {
return Boolean.TRUE;
}
}
}
return Boolean.FALSE;

}

@Override
public Collection<V> values() {
Collection<V> values = super.values();
if (values == null || values.size() < 1) {
return values;
}
Iterator<V> iterator = values.iterator();
while (iterator.hasNext()) {
V next = iterator.next();
if (!containsValue(next)) {
iterator.remove();
}
}
return values;
}

@Override
public int size() {
return entrySet().size();
}

@Override
public V get(Object key) {
if (key == null) {
return null;
}
if (checkExpire(key)) {
return null;
}
return super.get(key);
}

@Override
public Set<Entry<K, V>> entrySet() {
Set<Entry<K, V>> entries = new HashSet<>();
Set<Entry<K, V>> set = super.entrySet();
for (Entry<K, V> kvEntry : set) {
if (!checkExpire(kvEntry.getKey())) {
entries.add(kvEntry);
}
}
return entries;
}

public void remove(String key) {
super.remove(key);
}

public Integer superSize() {
return super.size();
}

@Override
public void putAll(Map<? extends K, ? extends V> m) {
for (Entry<? extends K, ? extends V> e : m.entrySet()) {
expireDic.put(e.getKey(), System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(DEFAULT_EXPIRE_SECONDS));
}
super.putAll(m);
}

public void putAllByExpire(Map<? extends K, ? extends V> m, int expire) {
for (Entry<? extends K, ? extends V> e : m.entrySet()) {
expireDic.put(e.getKey(), System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(expire));
}
super.putAll(m);
}

private static class CleanTask<K, V> extends TimerTask {

private ExpireMap<K, V> expireMap;

CleanTask(ExpireMap<K, V> expireMap) {
this.expireMap = expireMap;
}

@Override
public void run() {
List<String> expireKey = new ArrayList<>();
expireMap.getExpireDic().forEach((k, v) -> {
if (expireMap.checkExpire(k)) {
expireKey.add(String.valueOf(k));
}
});
expireKey.forEach(key -> {
LOGGER.info("缓存KEY[{}]已过期。", key);
expireMap.remove(key);
expireMap.getExpireDic().remove(key);
});
}
}

@Override
public void clear() {
super.clear();
expireDic.clear();
}

private synchronized boolean checkExpire(Object key) {
if (!expireDic.containsKey(key)) {
return Boolean.FALSE;
}
long expiryTime = expireDic.get(key);
return System.currentTimeMillis() > expiryTime;
}

public static void main(String[] args) throws InterruptedException {
ExpireMap<String, String> map = new ExpireMap<>(0, 1);
map.put("aaa", "test", 6, TimeUnit.SECONDS);
map.put("bbb", "test", 2, TimeUnit.SECONDS);
map.put("ccc", "test", 3, TimeUnit.SECONDS);
map.put("ddd", "test", 4, TimeUnit.SECONDS);
map.put("fff", "test", 5, TimeUnit.SECONDS);

for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
printMap(map);
System.out.println("expireDic:" + map.expireDic.size());
System.out.println("map:" + map.superSize());
}
Thread.sleep(10000000);
}

private static void putData(ExpireMap<String, String> map) {
for (int i = 0; i < 1000; i++) {
map.put("test" + (System.currentTimeMillis() + i), "jfdsaj" + UUID.randomUUID().toString());
}
}

private static void printMap(ExpireMap<String, String> map) {
LOGGER.info(">>>>>>>>>>>>>>>>map size:{}<<<<<<<<<<<<<<<<<<<<<", map.size());
}
}
RejectionMailet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailetcontainer.impl.matchers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;


import org.apache.james.mailetcontainer.impl.matchers.utils.CacheUtil;
import org.apache.mailet.Mail;
import org.apache.mailet.MailetConfig;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;

/**
* * @author xen
* * @date 2025/3/5
**/
public class RejectionMailet extends GenericMailet {

public static final Logger LOGGER = LoggerFactory.getLogger(RejectionMailet.class);

//静态风控
private volatile List<String> rejectedDomains = new ArrayList<>();//拒绝的邮箱域
private volatile List<String> rejectedFromEmailAddress = new ArrayList<>();//拒绝的发件邮箱
private volatile List<String> rejectedToEmailAddress = new ArrayList<>();//拒绝的收件邮箱
private volatile List<String> rejectedSenderIPs = new ArrayList<>();//拒绝的发件邮箱服务器的ip地址

//动态风控
private volatile Long ipLimitTime;//锁定时间,秒
private volatile Long ipRiskControlTime;//风控时长,秒
private volatile Long ipFrequency;//ip在风控时长内的最大请求次数

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

@Override
public void init() throws MessagingException {
super.init();
loadConfiguration();
scheduler.scheduleAtFixedRate(this::loadConfiguration, 30, 30, TimeUnit.SECONDS);
}

private void loadConfiguration() {
MailetConfig config = getMailetConfig();
rejectedDomains = parseConfig(config.getInitParameter("rejectedDomains"));
rejectedFromEmailAddress = parseConfig(config.getInitParameter("rejectedFromEmailAddress"));
rejectedToEmailAddress = parseConfig(config.getInitParameter("rejectedToEmailAddress"));
rejectedSenderIPs = parseConfig(config.getInitParameter("rejectedSenderIPs"));

ipLimitTime = parseLongConfig(config.getInitParameter("ipLimitTime"));
ipRiskControlTime = parseLongConfig(config.getInitParameter("ipRiskControlTime"));
ipFrequency = parseLongConfig(config.getInitParameter("ipFrequency"));

LOGGER.info("james风控动态刷新配置: rejectedDomains={}, rejectedFromEmailAddress={}, rejectedToEmailAddress={}, rejectedSenderIPs={}, ipLimitTime={}, ipRiskControlTime={}, ipFrequency={}",
rejectedDomains, rejectedFromEmailAddress, rejectedToEmailAddress, rejectedSenderIPs, ipLimitTime, ipRiskControlTime, ipFrequency);

}

private List<String> parseConfig(String configValue) {
if (Strings.isNullOrEmpty(configValue)) {
return Collections.emptyList();
}
return Arrays.stream(configValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

private Long parseLongConfig(String configValue) {
try {
return Long.parseLong(configValue);
} catch (NumberFormatException e) {
LOGGER.warn("Invalid long value for config: {}", configValue);
return null;
}
}

@Override
public void destroy() {
scheduler.shutdown();
super.destroy();
}

@Override
public void service(Mail mail) throws MessagingException {
List<String> from = Arrays.stream(mail.getMessage().getFrom()).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());

// 获取收件人地址
List<String> to = Arrays.stream(mail.getMessage().getRecipients(Message.RecipientType.TO))
.map(m -> ((InternetAddress) m).getAddress())
.collect(Collectors.toList());

LOGGER.info("发件人:{} 进入自定义拦截逻辑", from);
String rejectionReason = getRejectionReason(from, to, mail.getRemoteAddr());
if (!Strings.isNullOrEmpty(rejectionReason)) {
mail.setState(Mail.GHOST); // 阻止进一步处理
sendRejectionNotification(mail, from, rejectionReason);
}
}

private String getRejectionReason(List<String> from, List<String> to, String senderIP) {
if (checkFormDomain(from)) {
return "Your mail cannot be delivered due to domain restrictions";
} else if (checkFormEmailAddress(from)) {
return "Your email address is restricted and cannot be delivered";
} else if (checkToEmailAddress(to)) {
return "The recipient's email address is restricted and cannot be delivered";
} else if (checkSenderIP(senderIP)) {
return "Your IP address is restricted and cannot send emails to this server";
} else if (dynamicIPRiskControl(senderIP)) {
return "The operation is too frequent and has been locked";
}
return "";
}

private void sendRejectionNotification(Mail originalMail, List<String> recipients, String content) {
try {
LOGGER.info("开始发拒收通知");
MimeMessage notification = new MimeMessage(originalMail.getMessage().getSession());
// 设置发件人为 postmaster,避免被拦截
notification.setFrom(new InternetAddress("postmaster@xx.xx"));
notification.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(String.join(",", recipients)));
notification.setSubject("Mail rejected notification");
notification.setText(content);
getMailetContext().sendMail(notification);
} catch (Exception e) {
LOGGER.error("发送拒收通知失败", e);
}
}

private Boolean checkFormDomain(List<String> from) {
if (rejectedDomains.isEmpty()) {
return false;
}
String regex = rejectedDomains.stream()
.filter(Objects::nonNull)
.map(domain -> Pattern.quote(domain.toLowerCase()))
.collect(Collectors.joining("|", "@(", ")$"));
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
return from.stream()
.filter(Objects::nonNull)
.anyMatch(email -> {
String lowerEmail = email.toLowerCase();
boolean b = pattern.matcher(lowerEmail).find()
&& !lowerEmail.equals("postmaster@xx.xx");
if (b) {
LOGGER.info("发件人:{} 域被拒绝", from);
}
return b;
});
}


private Boolean checkFormEmailAddress(List<String> from) {
if (rejectedFromEmailAddress.isEmpty()) {
return false;
}
boolean b = from.stream()
.anyMatch(email -> rejectedFromEmailAddress.contains(email) && !"postmaster@xx.xx".equals(email));
if (b) {
LOGGER.info("发件人:{} 邮箱地址被拒绝", from);
}
return b;
}

private Boolean checkToEmailAddress(List<String> to) {
if (rejectedToEmailAddress.isEmpty()) {
return false;
}

LOGGER.info("收件人地址:{}", to);

boolean b = to.stream()
.anyMatch(email -> rejectedToEmailAddress.contains(email) && !"postmaster@xx.xx".equals(email));
if (b) {
LOGGER.info("收件人:{} 邮箱地址被拒绝", to);
}
return b;
}

private Boolean checkSenderIP(String senderIP) {
if (rejectedSenderIPs.isEmpty()) {
return false;
}
LOGGER.info("发件人IP地址:{}", senderIP);

boolean b = rejectedSenderIPs.contains(senderIP);
if (b) {
LOGGER.info("发件人IP:{} 被拒绝", senderIP);
}
return b;
}

private Boolean dynamicIPRiskControl(String senderIP) {
long currentTime = System.currentTimeMillis() / 1000;

// 检查IP是否已被锁定
Object lockExpireTimeObj = CacheUtil.get(senderIP);
if (lockExpireTimeObj != null) {
long lockExpireTime = (Long) lockExpireTimeObj;
if (currentTime < lockExpireTime) {
LOGGER.info("动态风控,IP地址:{},处于锁定状态", senderIP);
return true; // IP处于锁定状态
} else {
LOGGER.info("动态风控,IP地址:{},锁定过期,移除记录", senderIP);
CacheUtil.rem(senderIP); // 锁定过期,移除记录
}
}

String frequencyKey = senderIP + ":frequency";
List<Long> requestTimes;

// 同步块保证同一IP的并发安全
synchronized ((frequencyKey).intern()) {
// 获取请求时间记录
requestTimes = (List<Long>) CacheUtil.get(frequencyKey);
if (requestTimes == null) {
requestTimes = new ArrayList<>();
}

LOGGER.info("动态风控,IP地址:{},频次:{}", senderIP, requestTimes.size());

// 清理超出风控时长的记录
long thresholdTime = currentTime - ipRiskControlTime;
requestTimes.removeIf(time -> time < thresholdTime);

// 判断是否超过频率限制
if (requestTimes.size() >= ipFrequency) {
// 触发锁定,记录锁定时间
long lockTime = currentTime + ipLimitTime;
CacheUtil.put(senderIP, lockTime, ipLimitTime);
CacheUtil.rem(frequencyKey); // 清除频率记录
LOGGER.info("动态风控,IP地址:{},触发锁定", senderIP);
return true;
}

// 添加当前请求时间并更新缓存
requestTimes.add(currentTime);
CacheUtil.put(frequencyKey, requestTimes, ipRiskControlTime);
}
return false;
}
}

拓展三(推荐)

该拓展是在拓展二的基础上改造而来,首先重命名了类为RiskMailet

#### 配置文件修改

因为配置文件不知道动态刷新,动态读取到的内容一直会是服务刚起来时候的数据

mailetcontainer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<mailet match="RecipientIsLocal" class="org.apache.james.mailetcontainer.impl.matchers.RiskMailet">
<postmaster>postmaster@xx.xx</postmaster>
<domainNameWhitelist>example.com,163.com</domainNameWhitelist>
<fromEmailAddressBlacklist>614164@qq.com,someone@example.com</fromEmailAddressBlacklist>
<toEmailAddressBlacklist>1@xx.xx,another@example.com</toEmailAddressBlacklist>
<ipAddressBlacklist>209.85.166.196,203.0.113.5</ipAddressBlacklist>
<ipLimitTime>60000</ipLimitTime> <!-- 锁定时间,单位:秒 -->
<ipRiskControlTime>3600000</ipRiskControlTime> <!-- 风控时长,单位:秒 -->
<ipFrequency>100</ipFrequency> <!-- 风控时长内的最大请求次数 -->
<dynamicIpWhitelist>example.com,163.com</dynamicIpWhitelist>
<fromLimitTime>60000</fromLimitTime>
<fromRiskControlTime>3600000</fromRiskControlTime>
<fromFrequency>100</fromFrequency>
<dynamicFromWhitelist>example.com,163.com</dynamicFromWhitelist>
<toLimitTime>60000</toLimitTime>
<toRiskControlTime>3600000</toRiskControlTime>
<toFrequency>100</toFrequency>
<dynamicToWhitelist>example.com,163.com</dynamicToWhitelist>
<rateLimitCapacity>60000</rateLimitCapacity>
<rateLimitRefillTokens>3600000</rateLimitRefillTokens>
<rateLimitInterval>100</rateLimitInterval><!-- 单位:秒 -->
</mailet>

json

改用走接口的形式来支持动态刷新,这是接口返回的数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"james": {
"risk": {
"riskSwitch": true,
"riskLogSwitch": true,
"refreshInterval": 60,
"host": "xx.xx",
"jamesServerIp": "xx.xx.xx.xx",
"postmaster": "postmaster@xx.xx",
"otherClientSwitch": true,
"notificationLimitTime": 2,
"domainNameWhitelist": [
"gmail.com"
],
"fromEmailAddressBlacklist": [],
"toEmailAddressBlacklist": [],
"fromLimitTime": 3600,
"fromRiskControlTime": 86400,
"fromFrequency": 30,
"dynamicFromWhitelist": [],
"toLimitTime": 3600,
"toRiskControlTime": 86400,
"toFrequency": 100,
"dynamicToWhitelist": [],
"rateLimitCapacity": 20000,
"rateLimitRefillTokens": 50,
"rateLimitInterval": 10
},
"listener": {
"receiptCallback": true
}
}
}

RateLimiter

1
2
3
4
5
6
matchers
-RejectionMailet
-utils
-CacheUtil
-ExpireMap
-RateLimiter

限流用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package org.apache.james.mailetcontainer.impl.matchers.utils;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* * @author xen
* * @date 2025/3/6
**/
public class RateLimiter {
private static RateLimiter instance; // 单例实例
private final Map<String, TokenBucket> cache = new ConcurrentHashMap<>();
private long maxTokens; // 每个桶的最大令牌数量
private long refillTokens; // 每秒补充的令牌数量
private long refillInterval; // 补充令牌的时间间隔,单位是秒

// 确保线程安全
private final Lock lock = new ReentrantLock();

private RateLimiter(long maxTokens, long refillTokens, long refillInterval) {
this.maxTokens = maxTokens;
this.refillTokens = refillTokens;
this.refillInterval = refillInterval;
}

// 单例
public static synchronized RateLimiter getInstance(long maxTokens, long refillTokens, long refillInterval) {
if (instance == null) {
instance = new RateLimiter(maxTokens, refillTokens, refillInterval);
}
return instance;
}

// 获取或创建一个针对特定IP或域名的令牌桶
private TokenBucket getOrCreateBucket(String key) {
return cache.computeIfAbsent(key, k -> new TokenBucket(maxTokens, refillTokens, refillInterval));
}

// 判断当前令牌桶的令牌是否足够
public boolean tryConsume(String key) {
TokenBucket bucket = getOrCreateBucket(key);
return bucket.tryConsume();
}

// 获取当前剩余的令牌数量
public long getAvailableTokens(String key) {
TokenBucket bucket = getOrCreateBucket(key);
return bucket.getAvailableTokens();
}

// 重置指定IP或域名的限流桶
public void resetBucket(String key) {
cache.remove(key);
}

// 刷新限流器配置
public void refreshConfig(long maxTokens, long refillTokens, long refillInterval) {
lock.lock();
try {
this.maxTokens = maxTokens;
this.refillTokens = refillTokens;
this.refillInterval = refillInterval;
// 清空所有令牌桶,强制重新配置
cache.clear();
} finally {
lock.unlock();
}
}

public void clear() {
lock.lock();
try {
// 清空所有令牌桶,强制重新配置
cache.clear();
} finally {
lock.unlock();
}
}

// 内部类:令牌桶
private class TokenBucket {
private final long maxTokens; // 每个桶的最大令牌数量
private final long refillTokens; // 每秒补充的令牌数量
private final long refillInterval; // 补充令牌的时间间隔(秒)
private long currentTokens; // 当前剩余的令牌数量
private long lastRefillTime; // 上次补充令牌的时间(毫秒)
private final Lock lock; // 用于线程安全的锁

public TokenBucket(long maxTokens, long refillTokens, long refillInterval) {
this.maxTokens = maxTokens;
this.refillTokens = refillTokens;
this.refillInterval = refillInterval;
this.currentTokens = maxTokens;
this.lastRefillTime = System.currentTimeMillis();
this.lock = new ReentrantLock();
}

// 获取当前剩余令牌数量
public long getAvailableTokens() {
refill();
return currentTokens;
}

// 尝试消耗一个令牌
public boolean tryConsume() {
lock.lock();
try {
refill();
if (currentTokens > 0) {
currentTokens--;
return true; // 成功消耗令牌
} else {
return false; // 没有足够的令牌
}
} finally {
lock.unlock();
}
}

// 补充令牌
private void refill() {
long now = System.currentTimeMillis();
long elapsedTime = now - lastRefillTime;

// 计算经过的时间是否大于补充令牌的时间间隔
if (elapsedTime >= refillInterval * 1000) {
// 计算补充的令牌数量
long tokensToAdd = (elapsedTime / (refillInterval * 1000)) * refillTokens;
// 更新剩余令牌数,确保不会超过最大容量
currentTokens = Math.min(maxTokens, currentTokens + tokensToAdd);
lastRefillTime = now; // 更新上次补充令牌的时间
}
}
}

// 示例用法
public static void main(String[] args) throws InterruptedException {
// 创建RateLimiter实例,设置每个桶的容量为10,每秒补充2个令牌
RateLimiter rateLimiter = RateLimiter.getInstance(10, 2, 1);

String ipAddress = "192.168.1.1";

// 尝试消耗令牌
if (rateLimiter.tryConsume(ipAddress)) {
System.out.println(ipAddress + " 请求被允许");
} else {
System.out.println(ipAddress + " 请求被限流");
}

// 输出剩余的令牌数量
System.out.println(ipAddress + " 剩余令牌数量: " + rateLimiter.getAvailableTokens(ipAddress));

// 等待2秒钟再尝试请求
Thread.sleep(2000);

// 再次尝试消耗令牌
if (rateLimiter.tryConsume(ipAddress)) {
System.out.println(ipAddress + " 请求被允许");
} else {
System.out.println(ipAddress + " 请求被限流");
}

// 动态刷新限流配置
rateLimiter.refreshConfig(15, 3, 1); // 更新为新的配置

// 重置该IP的限流桶
rateLimiter.resetBucket(ipAddress);
}
}

RiskMailet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailetcontainer.impl.matchers;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;


import org.apache.james.mailetcontainer.impl.matchers.utils.CacheUtil;
import org.apache.james.mailetcontainer.impl.matchers.utils.RateLimiter;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;

/**
* * @author xen
* * @date 2025/3/5
**/

public class RiskMailet extends GenericMailet {

public static final Logger LOGGER = LoggerFactory.getLogger(RiskMailet.class);


//风控配置

private volatile Boolean riskSwitch = false;
private volatile Boolean riskLogSwitch = false;
private volatile Integer refreshInterval = 60;//秒

private volatile String postmaster;//发送通知的邮箱

private volatile String host;
private volatile String jamesServerIp;
private volatile Boolean otherClientSwitch = false;
private volatile Integer notificationLimitTime = 60;//一个账号间隔内只发送一次通知

//静态风控

private volatile List<String> domainNameWhitelist = new ArrayList<>();//域名白名单
private volatile List<String> fromEmailAddressBlacklist = new ArrayList<>();//发件人黑名单
private volatile List<String> toEmailAddressBlacklist = new ArrayList<>();//收件人黑名单

//动态风控

//发件人
private volatile Long fromLimitTime;//锁定时间,秒
private volatile Long fromRiskControlTime;//风控时长,秒
private volatile Long fromFrequency;//风控时长内的最大请求次数
private volatile List<String> dynamicFromWhitelist = new ArrayList<>();//白名单

//收件人
private volatile Long toLimitTime;//锁定时间,秒
private volatile Long toRiskControlTime;//风控时长,秒
private volatile Long toFrequency;//风控时长内的最大请求次数
private volatile List<String> dynamicToWhitelist = new ArrayList<>();//白名单

//限流
private volatile Long rateLimitCapacity;//令牌桶容量
private volatile Long rateLimitRefillTokens;//刷新令牌的数量
private volatile Long rateLimitInterval;//刷新令牌的间隔、毫秒


private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final HttpClient httpClient = HttpClient.newHttpClient();
private final ObjectMapper objectMapper = new ObjectMapper();

private ScheduledFuture<?> scheduledFuture;

private static final String CONFIG_API_URL = "http://127.0.0.1:6001/api/mail/config/risk";
private static final String LOG_API_URL = "http://127.0.0.1:6001/api/mail/risk/log";

@Override
public void init() throws MessagingException {
super.init();
fetchRiskConfigAsync();
synchronized (this) {
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
scheduledFuture = scheduler.scheduleAtFixedRate(this::fetchRiskConfigAsync, 0, refreshInterval, TimeUnit.SECONDS);
}
}

private void fetchRiskConfigAsync() {
CompletableFuture.runAsync(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(CONFIG_API_URL))
.POST(HttpRequest.BodyPublishers.noBody())
.build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

if (response.statusCode() == 200) {
parseRiskConfig(response.body());
} else {
LOGGER.error("获取风控配置失败,状态码: {}", response.statusCode());
}
} catch (Exception e) {
LOGGER.error("获取风控配置异常: {}", e.getMessage());
}
});
}

private void fetchRiskLogAsync(String type, String content) {
CompletableFuture.runAsync(() -> {
try {
// 构造请求体 JSON
ObjectMapper objectMapper = new ObjectMapper();
String requestBody = objectMapper.writeValueAsString(Map.of(
"type", type,
"content", content
));
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(LOG_API_URL))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

if (response.statusCode() == 200) {
LOGGER.info("发送风控日志成功");
} else {
LOGGER.error("发送风控日志失败,状态码: {}", response.statusCode());
}
} catch (Exception e) {
LOGGER.error("发送风控日志异常: {}", e.getMessage(), e);
}
});
}

private void parseRiskConfig(String responseBody) {
try {
JsonNode root = objectMapper.readTree(responseBody).get("msg").get("defaultConfigValue");
JsonNode jamesConfig = objectMapper.readTree(root.asText());
JsonNode riskConfig = jamesConfig.get("james").get("risk");

JsonNode uIDEventListenerConfig = jamesConfig.get("james").get("listener");

// 解析监听器配置并更新变量
if (uIDEventListenerConfig.hasNonNull("receiptCallback")) {
UIDEventListener.receiptCallback = uIDEventListenerConfig.get("receiptCallback").asBoolean();
}

// 解析风控配置并更新变量
if (riskConfig.hasNonNull("riskSwitch")) {
this.riskSwitch = riskConfig.get("riskSwitch").asBoolean();
}
if (riskConfig.hasNonNull("riskLogSwitch")) {
this.riskLogSwitch = riskConfig.get("riskLogSwitch").asBoolean();
}
if (riskConfig.hasNonNull("refreshInterval")) {
int newRefreshInterval = riskConfig.get("refreshInterval").asInt();
if (newRefreshInterval > 0 && newRefreshInterval != refreshInterval) {
this.refreshInterval = newRefreshInterval;
// 取消旧任务
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true); // 强制中断旧任务
}
// 重新调度新任务
synchronized (this) {
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
scheduledFuture = scheduler.scheduleAtFixedRate(this::fetchRiskConfigAsync, 0, refreshInterval, TimeUnit.SECONDS);
}
}
}
if (riskConfig.hasNonNull("host")) {
this.host = riskConfig.get("host").asText();
}
if (riskConfig.hasNonNull("jamesServerIp")) {
this.jamesServerIp = riskConfig.get("jamesServerIp").asText();
}
if (riskConfig.hasNonNull("postmaster")) {
this.postmaster = riskConfig.get("postmaster").asText();
}
if (riskConfig.hasNonNull("otherClientSwitch")) {
this.otherClientSwitch = riskConfig.get("otherClientSwitch").asBoolean();
}
if (riskConfig.hasNonNull("notificationLimitTime")) {
this.notificationLimitTime = riskConfig.get("notificationLimitTime").asInt();
}
// 静态风控配置
if (riskConfig.hasNonNull("domainNameWhitelist")) {
this.domainNameWhitelist.clear();
riskConfig.get("domainNameWhitelist").forEach(node -> domainNameWhitelist.add(node.asText()));
}
if (riskConfig.hasNonNull("fromEmailAddressBlacklist")) {
this.fromEmailAddressBlacklist.clear();
riskConfig.get("fromEmailAddressBlacklist").forEach(node -> fromEmailAddressBlacklist.add(node.asText()));
}
if (riskConfig.hasNonNull("toEmailAddressBlacklist")) {
this.toEmailAddressBlacklist.clear();
riskConfig.get("toEmailAddressBlacklist").forEach(node -> toEmailAddressBlacklist.add(node.asText()));
}

// 动态风控配置
if (riskConfig.hasNonNull("fromLimitTime")) {
this.fromLimitTime = riskConfig.get("fromLimitTime").asLong();
}
if (riskConfig.hasNonNull("fromRiskControlTime")) {
this.fromRiskControlTime = riskConfig.get("fromRiskControlTime").asLong();
}
if (riskConfig.hasNonNull("fromFrequency")) {
this.fromFrequency = riskConfig.get("fromFrequency").asLong();
}
if (riskConfig.hasNonNull("dynamicFromWhitelist")) {
this.dynamicFromWhitelist.clear();
riskConfig.get("dynamicFromWhitelist").forEach(node -> dynamicFromWhitelist.add(node.asText()));
}
if (riskConfig.hasNonNull("toLimitTime")) {
this.toLimitTime = riskConfig.get("toLimitTime").asLong();
}
if (riskConfig.hasNonNull("toRiskControlTime")) {
this.toRiskControlTime = riskConfig.get("toRiskControlTime").asLong();
}
if (riskConfig.hasNonNull("toFrequency")) {
this.toFrequency = riskConfig.get("toFrequency").asLong();
}
if (riskConfig.hasNonNull("dynamicToWhitelist")) {
this.dynamicToWhitelist.clear();
riskConfig.get("dynamicToWhitelist").forEach(node -> dynamicToWhitelist.add(node.asText()));
}

// 限流配置
if (riskConfig.hasNonNull("rateLimitCapacity") && riskConfig.hasNonNull("rateLimitRefillTokens") && riskConfig.hasNonNull("rateLimitInterval")) {
Long newRateLimitCapacity = riskConfig.get("rateLimitCapacity").asLong();
Long newRateLimitRefillTokens = riskConfig.get("rateLimitRefillTokens").asLong();
Long newRateLimitInterval = riskConfig.get("rateLimitInterval").asLong();
if (!newRateLimitCapacity.equals(rateLimitCapacity) ||
!newRateLimitRefillTokens.equals(rateLimitRefillTokens) ||
!newRateLimitInterval.equals(rateLimitInterval)) {
rateLimitCapacity = newRateLimitCapacity;
rateLimitRefillTokens = newRateLimitRefillTokens;
rateLimitInterval = newRateLimitInterval;
LOGGER.info("刷新限流配置");
RateLimiter rateLimiter = RateLimiter.getInstance(rateLimitCapacity, rateLimitRefillTokens, rateLimitInterval);
rateLimiter.refreshConfig(newRateLimitCapacity, newRateLimitRefillTokens, newRateLimitInterval);
}
} else {
LOGGER.info("清空限流配置");
try {
RateLimiter rateLimiter = RateLimiter.getInstance(rateLimitCapacity, rateLimitRefillTokens, rateLimitInterval);
rateLimiter.clear();
} catch (Exception e) {
LOGGER.error("清空限流配置报错:{}", e.getMessage());
}
}
LOGGER.info("风控配置已更新: riskSwitch={},riskLogSwitch={}, refreshInterval={}, host={}, jamesServerIp={}, postmaster={}, " +
"otherClientSwitch={}, notificationLimitTime={}, domainNameWhitelist={}, fromEmailAddressBlacklist={}, " +
"toEmailAddressBlacklist={}, fromLimitTime={}, fromRiskControlTime={}, fromFrequency={}, dynamicFromWhitelist={}, " +
"toLimitTime={}, toRiskControlTime={}, toFrequency={}, dynamicToWhitelist={}, rateLimitCapacity={}, " +
"rateLimitRefillTokens={}, rateLimitInterval={}",
riskSwitch, riskLogSwitch, refreshInterval, host, jamesServerIp, postmaster,
otherClientSwitch, notificationLimitTime, domainNameWhitelist, fromEmailAddressBlacklist, toEmailAddressBlacklist,
fromLimitTime, fromRiskControlTime, fromFrequency, dynamicFromWhitelist, toLimitTime, toRiskControlTime,
toFrequency, dynamicToWhitelist, rateLimitCapacity, rateLimitRefillTokens, rateLimitInterval);

} catch (IOException e) {
LOGGER.error("解析风控配置异常: {}", e.getMessage());
e.fillInStackTrace();
}
}


@Override
public void destroy() {
scheduler.shutdown();
super.destroy();
}

@Override
public void service(Mail mail) throws MessagingException {
try {
List<String> from = Arrays.stream(mail.getMessage().getFrom()).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());
if (!riskSwitch) {
LOGGER.info("风控开关没开,跳过风控");
} else if (!from.isEmpty() && (!Strings.isNullOrEmpty(postmaster) && postmaster.equals(from.get(0)))) {
LOGGER.info("postmaster账户发的邮件,跳过风控");
} else {
// 获取收件人地址
List<String> to = Arrays.stream(mail.getMessage().getRecipients(Message.RecipientType.TO))
.map(m -> ((InternetAddress) m).getAddress())
.collect(Collectors.toList());

LOGGER.info("发件人:{} 进入自定义拦截逻辑", from);

String rejectionReason = getRiskReason(from, to, mail.getRemoteAddr());
if (!Strings.isNullOrEmpty(rejectionReason)) {
LOGGER.info("走完风控流程,被拦截...");
mail.setState(Mail.GHOST); // 阻止进一步处理
sendRejectionNotification(mail, from, rejectionReason);
} else {
LOGGER.info("走完风控流程,放行...");
}
}

} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}

private void riskLog(List<String> from, String type, String content) {
if (riskLogSwitch) {

if (CacheUtil.get("notification-" + from.get(0)) != null) {
LOGGER.info("{}时间限制内已经推送过风控日志终止本次推送,避免频繁派发", from.get(0));
return;
}

fetchRiskLogAsync(type, content);
} else {
LOGGER.info("风控日志推送未开启,跳过发送风控日志");
}
}

private String getRiskReason(List<String> from, List<String> to, String sendServerIp) {

LOGGER.info("发件服务器ip:{} 进入自定义拦截逻辑", sendServerIp);

if (!checkFormDomain(from)) {
riskLog(from, "白名单", from.get(0) + "不在邮箱域白名单,发件进来被拒");
return "Your mail cannot be delivered due to domain restrictions";
} else if (checkOtherClient(from, sendServerIp)) {
riskLog(from, "第三方客户端", from.get(0) + "使用第三方客户端进行发件被拦截");
LOGGER.info("风控拦截,使用第三方客户端进行发件");
return "This operation poses a security risk and has been restricted";
} else if (checkFormEmailAddress(from)) {
riskLog(from, "发件人黑名单", from.get(0) + "发件人黑名单用户发件被拦截");
return "Your email address is restricted and cannot be delivered";
} else if (checkToEmailAddress(to)) {
riskLog(from, "收件人黑名单", String.join(",", to) + "收件人黑名单用户收件被拦截");
return "The recipient's email address is restricted and cannot be delivered:" + String.join(",", to);
} else if (batchDynamicRiskControl(from, fromLimitTime, fromRiskControlTime, fromFrequency, dynamicFromWhitelist, "发件人")) {
riskLog(from, "发件人限流", from.get(0) + "发件超额被拦截");
return "The operation is too frequent and has been locked -f";
} else if (batchDynamicRiskControl(to, toLimitTime, toRiskControlTime, toFrequency, dynamicToWhitelist, "收件人")) {
riskLog(from, "收件人限流", String.join(",", to) + "收件超额被拦截");
return "The operation is too frequent and has been locked -t:" + String.join(",", to);
} else if (batchDynamicRiskControl(from)) {
riskLog(from, "邮箱域限流", getDomainFromEmail(from.get(0)) + "域流量超额被拦截");
return "The request has been locked due to excessive request attempts";
}
return "";
}

public boolean batchDynamicRiskControl(List<String> targets) {

if (targets.isEmpty()) {
return false;
}

boolean isLimited = false; // 标记是否有目标被锁定

//保证每个元素都走完流程
for (String target : targets) {
boolean result = applyDomainOrIpRateLimit(getDomainFromEmail(target));
if (result) {
isLimited = true; // 只要有一个目标被锁定,最终就返回 true
}
}

return isLimited; // 遍历完整个列表后,返回最终结果
}

public boolean avoidLocalhost(String target, String type) {
if ("127.0.0.1".equals(target) || "localhost".equals(target)) {
LOGGER.info("{},本机ip,跳过", type);
return false;
}

if (!Strings.isNullOrEmpty(postmaster) && postmaster.equals(target)) {
LOGGER.info("{},postmaster账户,跳过", type);
return false;
}
return true;
}

private boolean applyDomainOrIpRateLimit(String target) {
if (rateLimitCapacity == null || rateLimitRefillTokens == null || rateLimitInterval == null) {
LOGGER.info("未配置限流参数,跳过限流");
return false;
}

if (!avoidLocalhost(target, "限流")) {
return false;
}

RateLimiter rateLimiter = RateLimiter.getInstance(rateLimitCapacity, rateLimitRefillTokens, rateLimitInterval);
LOGGER.info("目标:{} 剩余令牌数量:{}", target, rateLimiter.getAvailableTokens(target));

// 检查令牌是否足够
if (rateLimiter.tryConsume(target)) {
return false; // 未限流
} else {
LOGGER.info("目标:{} 超过限流阈值,触发限制", target);
return true; // 已限流
}
}


private void sendRejectionNotification(Mail originalMail, List<String> recipients, String content) {
try {

if (Strings.isNullOrEmpty(postmaster)) {
LOGGER.info("没有配置通知邮箱,跳过发送拒收通知");
return;
}

Object o = CacheUtil.get("notification-" + recipients.get(0));

if (o != null) {
LOGGER.info("{}时间限制内已经发过拒收通知,终止本次通知,避免频繁派发", recipients.get(0));
return;
}

LOGGER.info("开始发拒收通知");
MimeMessage notification = new MimeMessage(originalMail.getMessage().getSession());
notification.setFrom(new InternetAddress(postmaster));
notification.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(String.join(",", recipients)));
notification.setSubject("Mail rejected notification");
// notification.setText(content);
notification.setText(content, "UTF-8", "html");
getMailetContext().sendMail(notification);

CacheUtil.put("notification-" + recipients.get(0), recipients, Long.valueOf(notificationLimitTime));

} catch (Exception e) {
LOGGER.error("发送拒收通知失败", e);
}
}

public static String getDomainFromEmail(String email) {
if (Strings.isNullOrEmpty(email) || !email.contains("@")) {
return null;
}
return email.substring(email.lastIndexOf('@') + 1).toLowerCase();
}

public Boolean checkOtherClient(List<String> from, String sendServerIp) {
if (!otherClientSwitch) {
LOGGER.info("未开启第三方客户端发件限制,跳过邮箱客户端校验");
return false;
}
return host.equals(getDomainFromEmail(from.get(0)))
&& !jamesServerIp.equals(sendServerIp)
&& !"127.0.0.1".equals(sendServerIp)
&& !"localhost".equals(sendServerIp);
}

public Boolean checkFormDomain(List<String> from) {
if (domainNameWhitelist.isEmpty()) {
LOGGER.info("未配置邮箱域白名单,跳过白名单校验");
return false;
}
Set<String> lowerCaseWhitelist = domainNameWhitelist.stream()
.filter(Objects::nonNull)
.map(String::toLowerCase)
.collect(Collectors.toSet());

boolean b = from.stream()
.filter(Objects::nonNull)
.allMatch(email -> {

if (!Strings.isNullOrEmpty(postmaster) && postmaster.equalsIgnoreCase(email)) {
return true; // 直接通过,不检查域名
}

return lowerCaseWhitelist.contains(getDomainFromEmail(email));
});

if (!b) {
LOGGER.info("发件域:{} 不在白名单,被拒", from);
}
return b;
}

private Boolean checkFormEmailAddress(List<String> from) {
if (fromEmailAddressBlacklist.isEmpty()) {
LOGGER.info("未配置发件人黑名单,跳过发件人黑名单校验");
return false;
}
boolean b = from.stream()
.anyMatch(email ->
fromEmailAddressBlacklist.contains(email) && (Strings.isNullOrEmpty(postmaster) || !postmaster.equals(email))
);
if (b) {
LOGGER.info("发件人:{} 邮箱地址被拒绝", from);
}
return b;
}

private Boolean checkToEmailAddress(List<String> to) {
if (toEmailAddressBlacklist.isEmpty()) {
LOGGER.info("未配置收件人黑名单,跳过收件人黑名单校验");
return false;
}

LOGGER.info("收件人地址:{}", to);

boolean b = to.stream()
.anyMatch(email -> toEmailAddressBlacklist.contains(email) && (Strings.isNullOrEmpty(postmaster) || !postmaster.equals(email))
);
if (b) {
LOGGER.info("收件人:{} 邮箱地址被拒绝", to);
}
return b;
}


public boolean batchDynamicRiskControl(List<String> targets, Long limitTime, Long
riskControlTime, Long frequency, List<String> dynamicWhitelist, String type) {

if (targets.isEmpty()) {
return false;
}

boolean isLimited = false; // 标记是否有目标被锁定

//保证每个元素都走完流程
for (String target : targets) {
boolean result = dynamicRiskControl(target, limitTime, riskControlTime, frequency, dynamicWhitelist, type);
if (result) {
isLimited = true; // 只要有一个目标被锁定,最终就返回 true
}
}

return isLimited; // 遍历完整个列表后,返回最终结果
}

private Boolean dynamicRiskControl(String target, Long limitTime, Long riskControlTime, Long
frequency, List<String> dynamicWhitelist, String type) {

if (Strings.isNullOrEmpty(target) || null == limitTime || null == riskControlTime || null == frequency) {
LOGGER.info("{}的动态配置不全,跳过", type);
return false;
}

if (!avoidLocalhost(target, "动态风控")) {
return false;
}

if (!dynamicWhitelist.isEmpty() && dynamicWhitelist.stream().anyMatch(target::equals)) {
LOGGER.info("{}-动态配置{}存在白名单中,跳过", type, target);
return false;
}


long currentTime = System.currentTimeMillis() / 1000;

// 检查目标是否已被锁定
Object lockExpireTimeObj = CacheUtil.get(target);
if (lockExpireTimeObj != null) {
long lockExpireTime = (Long) lockExpireTimeObj;
if (currentTime < lockExpireTime) {
LOGGER.info("动态风控,目标:{},处于锁定状态", target);
return true; // 目标处于锁定状态
} else {
LOGGER.info("动态风控,目标:{},锁定过期,移除记录", target);
CacheUtil.rem(target); // 锁定过期,移除记录
}
}

String frequencyKey = target + ":frequency";
List<Long> requestTimes;

// 同步块保证同一目标的并发安全
synchronized ((frequencyKey).intern()) {
// 获取请求时间记录
requestTimes = (List<Long>) CacheUtil.get(frequencyKey);
if (requestTimes == null) {
requestTimes = new ArrayList<>();
}

LOGGER.info("动态风控,目标:{},频次:{}", target, requestTimes.size());

// 清理超出风控时长的记录
long thresholdTime = currentTime - riskControlTime;
requestTimes.removeIf(time -> time < thresholdTime);

// 判断是否超过频率限制
if (requestTimes.size() >= frequency) {
// 触发锁定,记录锁定时间
long lockTime = currentTime + limitTime;
CacheUtil.put(target, lockTime, limitTime);
CacheUtil.rem(frequencyKey); // 清除频率记录
LOGGER.info("动态风控,目标:{},触发锁定", target);
return true;
}

// 添加当前请求时间并更新缓存
requestTimes.add(currentTime);
CacheUtil.put(frequencyKey, requestTimes, riskControlTime);
}
return false;
}
}