发布于 

James添加收件监听

前言

这个监听器可以获取到uid

替换依赖脚本

UpdateDependency.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/bin/bash

# jar包名字
DEP_JAR="james-server-mailetcontainer-impl-3.8.1.jar"
# 日志的绝对路径
NOHUP_OUT="$(pwd)/nohup.out"
# 原jar所在的目录(相对路径或绝对路径均可)
LIB_PATH="james-server-distributed-app.lib"
# 新jar包的源目录和目标目录
SRC_DIR="/home/kevin"
DEST_DIR="/data/apps/mail/james-server-distributed-app/james-server-distributed-app.lib"

# 在项目根目录下执行
PID=$(jps -l | grep "james-server-distributed-app.jar" | awk '{print $1}')
if [ -n "$PID" ]; then
echo "找到进程ID: $PID,正在杀死 james-server-distributed-app.jar 进程..."
kill -9 "$PID"
else
echo "未找到运行中的 james-server-distributed-app.jar 进程。"
fi

# 删除日志文件
rm -rf "$NOHUP_OUT"

# 删除原有依赖
rm -rf "$LIB_PATH"/"$DEP_JAR"

# 将新依赖从源目录移动到目标目录
mv "$SRC_DIR"/"$DEP_JAR" "$DEST_DIR"/

# 启动服务(后台运行)
java -Dworking.directory=. -Dlogback.configurationFile=conf/logback.xml \
-Djdk.tls.ephemeralDHKeySize=2048 -jar james-server-distributed-app.jar &

脚本放James服务器根目录,赋值权限

1
chmod +x UpdateDependency.sh

运行

1
./UpdateDependency.sh

添加监听器

UIDEventListener,注意类文件路径,不是所有模块都可以添加监听器

当前类文件放在server/mailet/mailetcontainer-impl/src/main/java/org.apache.james.mailetcontainer/impl/matchers目录里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package org.apache.james.mailetcontainer.impl.matchers;


import org.apache.james.events.Event;
import org.apache.james.events.EventListener;
import org.apache.james.events.Group;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.events.MailboxEvents;
import org.apache.james.mailbox.model.MailboxId;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class UIDEventListener implements EventListener.ReactiveGroupEventListener {

public static class UIDEventListenerGroup extends Group {

}

private static final Group GROUP = new UIDEventListenerGroup();

private static final String HTTP_ENDPOINT = "http://thirdparty.example.com/api/receive";

public static final Logger LOGGER = LoggerFactory.getLogger(UIDEventListener.class);


@Override
public Publisher<Void> reactiveEvent(Event event) {
if (event instanceof MailboxEvents.Added) {
MailboxEvents.Added addedEvent = (MailboxEvents.Added) event;
MailboxId mailboxId = addedEvent.getMailboxId();
for (MessageUid uid : addedEvent.getUids()) {
String jsonData = String.format("{\"mailboxId\": \"%s\", \"uid\": \"%s\"}",
mailboxId.serialize(), uid.asLong());
sendHttpRequest(jsonData);
}
}
return Mono.empty();
}

@Override
public ExecutionMode getExecutionMode() {
// 这里返回异步模式,若需要同步请改为 ExecutionMode.SYNCHRONOUS(根据实际需求)
return ExecutionMode.ASYNCHRONOUS;
}

@Override
public boolean isHandling(Event event) {
// 仅处理 MailboxEvents.Added 事件
return event instanceof MailboxEvents.Added;
}

private void sendHttpRequest(String jsonData) {
LOGGER.info("收件触发: {}", jsonData);
}

@Override
public Group getDefaultGroup() {
return GROUP;
}
}

修改配置文件

服务器conf目录下的listeners.xml文件,新建或者添加

1
2
3
<listener>
<class>org.apache.james.mailetcontainer.impl.matchers.UIDEventListener</class>
</listener>

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 <?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<!-- Read https://james.apache.org/server/config-listeners.html for further details -->

<listeners>
<executeGroupListeners>true</executeGroupListeners>
<listener>
<class>org.apache.james.mailbox.cassandra.MailboxOperationLoggingListener</class>
</listener>

<listener>
<class>org.apache.james.mailetcontainer.impl.matchers.UIDEventListener</class>
</listener>


<!-- Enable to populate JMAP EmailQueryView -->
<!--
<listener>
<class>org.apache.james.jmap.event.PopulateEmailQueryViewListener</class>
<async>true</async>
</listener>
-->

<listener>
<class>org.apache.james.rspamd.RspamdListener</class>
</listener>
<!--
<listener>
<class>org.apache.james.spamassassin.SpamAssassinListener</class>
<async>true</async>
</listener>-->
</listeners>

重启James

1
./bin/james-server.sh stop
1
./bin/james-server.sh start

或者jps找到james的进程再kill -9 杀掉进程

1
2
3
4
5
6
7
8
9
10
root@touchs-test:/data/apps/mail# jps
1092272 touchs.jar
1110035 Jps
24307 agent-2.11.11.jar
682496 james-server-distributed-app.jar
72491 OpenSearch
72492 PerformanceAnalyzerApp
240781 server-2.11.11.jar
root@touchs-test:/data/apps/mail# kill -9 682496
root@touchs-test:/data/apps/mail#
1
java -Dworking.directory=. -Dlogback.configurationFile=conf/logback.xml -Djdk.tls.ephemeralDHKeySize=2048 -jar james-server-distributed-app.jar &

或者jar包上传后直接用上面的脚本处理

拓展

触发后调用异步调用接口

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.touchsmail.common.domain.AjaxResult;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/api/mail/hook")
public class EmailHookController {

/**
* 邮箱服务器接收邮件回调
*/
@ApiOperation(value = "邮箱服务器接收邮件回调")
@PostMapping("/receive")
public AjaxResult receiveEmails(@RequestBody String param) {
return AjaxResult.success(param);
}

}

推送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
package org.apache.james.mailetcontainer.impl.matchers;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import javax.inject.Inject;
import javax.mail.Address;
import javax.mail.BodyPart;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Multipart;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.commons.lang3.StringUtils;
import org.apache.james.events.Event;
import org.apache.james.events.EventListener;
import org.apache.james.events.Group;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.events.MailboxEvents;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageRange;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;


public class UIDEventListener implements EventListener.ReactiveGroupEventListener {

private final MailboxManager mailboxManager;
private static final Logger LOGGER = LoggerFactory.getLogger(UIDEventListener.class);

public static volatile Boolean receiptCallback = false;

@Inject
public UIDEventListener(MailboxManager mailboxManager) {
this.mailboxManager = mailboxManager;
}

@Override
public Publisher<Void> reactiveEvent(Event event) {

if (!receiptCallback) {
LOGGER.info("收件回调未打开");
return Mono.empty();
} else if (event instanceof MailboxEvents.Added) {
MailboxEvents.Added addedEvent = (MailboxEvents.Added) event;
MailboxSession session = mailboxManager.createSystemSession(event.getUsername());

return Flux.fromIterable(addedEvent.getUids())
.flatMap(uid -> processMessage(addedEvent.getMailboxId(), uid, session))
.onErrorResume(e -> {
LOGGER.error("Message processing failed", e);
return Mono.empty();
})
.then();
}
return Mono.empty();
}

// 在消息处理链中传递文件夹名称
private Mono<Void> processMessage(MailboxId mailboxId, MessageUid uid, MailboxSession session) {
return Mono.usingWhen(
Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxId, session)),
messageManager -> {
String folderName = extractFolderName(messageManager, mailboxId);
return fetchMessageContent(messageManager, uid, session)
.flatMap(this::parseMimeMessage)
.flatMap(mimeMessage -> sendNotification(mailboxId, uid, mimeMessage, folderName));
},
messageManager -> Mono.empty()
)
.doOnError(e -> LOGGER.warn("Failed to process UID:{} in mailbox:{}",
uid.asLong(), mailboxId.serialize(), e))
.onErrorResume(e -> Mono.empty());
}

// 增加文件夹名称参数传递
private Mono<Void> sendNotification(MailboxId mailboxId, MessageUid uid,
MimeMessage mimeMessage, String folderName) {
try {
sendAsyncHttpRequest(mimeMessage, mailboxId, uid, folderName);
return Mono.empty();
} catch (Exception e) {
return Mono.error(e);
}
}


// 新增文件夹名称提取方法
private String extractFolderName(MessageManager messageManager, MailboxId mailboxId) {
try {
return messageManager.getMailboxPath().getName();
} catch (Exception e) {
LOGGER.warn("Cannot get folder name for mailboxId:{}", mailboxId);
return "unknown_folder";
}
}

// 增加JSON特殊字符转义处理‌:ml-citation{ref="1" data="citationList"}
private String escapeJsonString(String input) {
return input.replace("\"", "\\\"")
.replace("\\", "\\\\")
.replace("\b", "\\b")
.replace("\f", "\\f")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}


private Mono<byte[]> fetchMessageContent(MessageManager manager, MessageUid uid, MailboxSession session) {
return Mono.from(manager.getMessagesReactive(MessageRange.one(uid),
FetchGroup.FULL_CONTENT, session))
.publishOn(Schedulers.boundedElastic())
.handle((messageResult, sink) -> {
try (InputStream is = messageResult.getFullContent().getInputStream()) {
sink.next(is.readAllBytes());
} catch (MailboxException | IOException e) {
sink.error(new RuntimeException(e));
}
});
}

private Mono<MimeMessage> parseMimeMessage(byte[] content) {
return Mono.fromCallable(() ->
new MimeMessage(null, new ByteArrayInputStream(content)));
}

private void sendAsyncHttpRequest(MimeMessage message, MailboxId mailboxId, MessageUid uid, String folderName) throws MessagingException, IOException {
HttpClient httpClient = HttpClient.newHttpClient();
Map<String, Object> stringObjectMap = buildRequestBody(message, mailboxId, uid, folderName);
ObjectMapper objectMapper = new ObjectMapper();
String requestBody = objectMapper.writeValueAsString(stringObjectMap);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://127.0.0.1:6001/api/mail/hook/receive"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
// 发送异步请求
CompletableFuture<HttpResponse<String>> futureResponse = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
// 处理异步响应
futureResponse.thenAccept(response -> {
int statusCode = response.statusCode();
String responseBody = response.body();

if (statusCode == 200) {
LOGGER.info("收件钩子:推送成功");
} else {
LOGGER.error("收件钩子:邮件推送失败,状态码: {} ,响应内容: {} ", statusCode, responseBody);
}

}).exceptionally(ex -> {
LOGGER.error("收件钩子:邮件推送时发生异常: {}", ex.toString());
return null;
});
}

private Map<String, Object> buildRequestBody(MimeMessage message, MailboxId mailboxId, MessageUid uid, String folderName) throws MessagingException, IOException {
Map<String, Object> emailData = new HashMap<>();
// 异步去获取邮件正文
CompletableFuture<Object> futureResult = handleRelatedContentAsync(message);

Address[] tos = message.getRecipients(Message.RecipientType.TO);

Address[] ccs = message.getRecipients(Message.RecipientType.CC);

Address[] bccs = message.getRecipients(Message.RecipientType.BCC);

List<String> to = new ArrayList<>();
List<String> cc = new ArrayList<>();
List<String> bcc = new ArrayList<>();

if (tos != null && tos.length > 0) {
to = Arrays.stream(tos).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());
}

if (ccs != null && ccs.length > 0) {
cc = Arrays.stream(ccs).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());
}

if (bccs != null && bccs.length > 0) {
bcc = Arrays.stream(bccs).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());
}


List<String> from = Arrays.stream(message.getFrom()).map(m -> ((InternetAddress) m).getAddress()).collect(Collectors.toList());

emailData.put("subject", message.getSubject());
emailData.put("from", from);
emailData.put("to", to);
emailData.put("cc", cc);
emailData.put("bcc", bcc);

emailData.put("mailboxId", mailboxId.serialize());
emailData.put("folderName", escapeJsonString(folderName));
emailData.put("uid", uid.asLong());

emailData.put("messageID", message.getMessageID());

emailData.put("date", message.getReceivedDate());
emailData.put("content", futureResult.join());
return emailData;
}

private CompletableFuture<Object> handleRelatedContentAsync(Message message) {
return CompletableFuture.supplyAsync(() -> {
try {
if (message.isMimeType("text/plain") || message.isMimeType("text/html")) {
return message.getContent().toString();
} else if (message.isMimeType("multipart/*")) {
Multipart multipart = (Multipart) message.getContent();
StringBuilder htmlContent = new StringBuilder();
Map<String, String> imageMap = new HashMap<>();
buildMultipartResout(multipart, htmlContent, imageMap);
return htmlContent.toString();
}
return "unknown content format";
} catch (Exception e) {
// 记录错误日志
LOGGER.error("Error processing related content", e);
return "error";
}
});
}

private void buildMultipartResout(Multipart multipart, StringBuilder htmlContent, Map<String, String> imageMap) throws Exception {

// 遍历所有部分,收集HTML内容和图片
for (int i = 0; i < multipart.getCount(); i++) {
BodyPart part = multipart.getBodyPart(i);
// 处理 multipart/related 类型
if (part.getContentType().contains("multipart/related") || part.getContentType().contains("multipart/RELATED")) {
buildMultipartResout((Multipart) part.getContent(), htmlContent, imageMap);
} else if (part.isMimeType("multipart/alternative") || part.getContentType().contains("multipart/ALTERNATIVE")) {
processMultipartAlternative(part, htmlContent, imageMap);
} else if (part.isMimeType("image/*")) {
processImg(part, imageMap);
} else if (part.getContent() instanceof Multipart) {
processMultipartAlternative(part, htmlContent, imageMap);
} else if (part.isMimeType("text/html")) {
htmlContent.append(part.getContent().toString());
} else {
// 可能是附件
LOGGER.warn("其他部分 ContentType: " + part.getContentType());
}
}

String content = getContent(htmlContent, imageMap);
htmlContent.setLength(0);
htmlContent.append(StringUtils.isEmpty(content) ? "unknown content format" : content);
}

private void processMultipartAlternative(BodyPart part, StringBuilder htmlContent, Map<String, String> imageMap) throws Exception {
if (part.getContent() instanceof Multipart) {
Multipart alternative = (Multipart) part.getContent();

for (int i = 0; i < alternative.getCount(); i++) {
BodyPart subPart = alternative.getBodyPart(i);

if (subPart.isMimeType("text/html")) {
htmlContent.append(subPart.getContent()).append("<br/>");
} else if (subPart.isMimeType("image/*")) {
processImg(subPart, imageMap);
} else if (subPart.isMimeType("multipart/alternative") || subPart.getContentType().contains("multipart/ALTERNATIVE")) {
processMultipartAlternative(subPart, htmlContent, imageMap);
} else if (subPart.getContentType().contains("multipart/related") || subPart.getContentType().contains("multipart/RELATED")) {
processMultipartAlternative(subPart, htmlContent, imageMap);
} else {
LOGGER.warn("[processMultipartAlternative]其他部分 ContentType: " + subPart.getContentType());
}
}
} else {
LOGGER.warn("multipart/ALTERNATIVE 的内容不是 Multipart 类型");
}
}

private static String getContent(StringBuilder htmlContent, Map<String, String> imageMap) {
String content = htmlContent.toString();

// 如果找到HTML内容和图片,进行替换
if (!imageMap.isEmpty()) {
for (Map.Entry<String, String> entry : imageMap.entrySet()) {
String contentId = entry.getKey();
String base64Image = entry.getValue();

// 替换HTML中的图片引用
String imgPattern = "cid:" + contentId;
String base64Pattern = "data:image/jpeg;base64," + base64Image;
content = content.replace(imgPattern, base64Pattern);
}
}
return content;
}

private void processImg(BodyPart part, Map<String, String> imageMap) throws MessagingException, IOException {
// 获取Content-ID
String[] contentIds = part.getHeader("Content-ID");
if (contentIds != null && contentIds.length > 0) {
String contentId = contentIds[0].replaceAll("[<>]", "");
// 转换图片为base64
String base64Image = convertImageToBase64(part);
imageMap.put(contentId, base64Image);
}
}

private String convertImageToBase64(BodyPart part) throws MessagingException, IOException {
try (InputStream is = part.getInputStream()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
baos.write(buffer, 0, bytesRead);
}
return Base64.getEncoder().encodeToString(baos.toByteArray());
}
}

@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNCHRONOUS;
}

@Override
public boolean isHandling(Event event) {
return event instanceof MailboxEvents.Added;
}

@Override
public Group getDefaultGroup() {
return new UIDEventListenerGroup();
}

public static class UIDEventListenerGroup extends Group {
}
}

编译

依赖注意顺序,否则编译会报错,不知道顺序可以问一下ai

在该模块的根目录下编译,编译好之后把jar包放到服务器james的依赖目录(james-server-distributed-app.lib)里面,然后重启James

1
2
3
edy@EDYs-MacBook-Air mailetcontainer-impl % pwd
/Users/edy/Documents/ProjectCode/James382/JamesByKre/server/mailet/mailetcontainer-impl
edy@EDYs-MacBook-Air mailetcontainer-impl % mvn clean install -Dmaven.test.skip=true -e
1
mvn clean install -Dmaven.test.skip=true -e