1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
ScheduledExecutorService executorService1 = Executors.newScheduledThreadPool(1); executorService1.scheduleWithFixedDelay(() -> { try { ObjectMapper objectMapper = new ObjectMapper(); List<Message> messages = sqsHelper.receiveMessages(queueUrl, 9); List<S3EventPojo> s3EventList = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { Message msg = messages.get(i); String body = msg.body(); JsonNode jsonNode = objectMapper.readTree(body); for (JsonNode recordNode : jsonNode.path("Records")) { String eventName = recordNode.path("eventName").asText(); String bucketName = recordNode.path("s3"). path("bucket").path("name").asText(); String key = recordNode.path("s3") .path("object").path("key").asText(); s3EventList.add(new S3EventPojo(eventName, key, bucketName)); } }
s3EventList.forEach(el -> { Path tmpFilePath = null; try { tmpFilePath = myS3ClientHelper.downloadS3FileToLocal(el.getBucketName(), el.getKey(), localDownloadTmpDir); kafkaMsgHandler.readFileAndSendMsg(el.getKey(),tmpFilePath); logger.info("compete read_and_send_msg to kafka,bucket:{}, key:{}", el.getBucketName(), el.getKey()); } catch (IOException e) { logger.error("read_and_send_msg error,cause:", e); } finally { if (Objects.nonNull(tmpFilePath)) { try { Files.deleteIfExists(tmpFilePath); } catch (IOException e) { logger.error("delete file error,cause:", e); } } } }); sqsHelper.deleteMessagesBatch(queueUrl, messages); } catch (Exception e) { logger.error("execute get msg and read data failed.cause:", e); } }, 0, 100, TimeUnit.MILLISECONDS);
|