अद्यतन फ़ाइलों को पहचानने के लिए AbstractInboundFileSynchronizer
अद्यतन करना संभव है, लेकिन यह भंगुर है और आप अन्य मुद्दों में भाग लेते हैं।
अपडेट 13/नवंबर/2016: सेकंड में संशोधन टाइमस्टैम्प कैसे प्राप्त करें, यह पता चला।
AbstractInboundFileSynchronizer
अद्यतन करने के साथ मुख्य समस्या यह है कि इसमें सेटर-विधियां हैं लेकिन कोई (संरक्षित) गेटर-विधियां नहीं हैं। यदि, भविष्य में, सेटर-विधियां कुछ स्मार्ट बनाती हैं, तो यहां प्रस्तुत अद्यतन संस्करण टूट जाएगा।
स्थानीय निर्देशिका में फ़ाइलों को अद्यतन करने के साथ मुख्य समस्या समरूपता है: यदि आप स्थानीय फ़ाइल को एक ही समय में अपडेट कर रहे हैं, तो एक अद्यतन प्राप्त हो रहा है, तो आप सभी प्रकार की परेशानी में भाग सकते हैं। स्थानीय फ़ाइल को एक (अस्थायी) प्रसंस्करण निर्देशिका में स्थानांतरित करना आसान तरीका है ताकि अद्यतन एक नई फ़ाइल के रूप में प्राप्त किया जा सके जो बदले में AbstractInboundFileSynchronizer
को अद्यतन करने की आवश्यकता को हटा देता है। कैमल का टाइमस्टैम्प remarks भी देखें।
डिफ़ॉल्ट रूप से FTP सर्वर मिनटों में संशोधन टाइमस्टैम्प प्रदान करते हैं। परीक्षण के लिए मैंने MLSD कमांड का उपयोग करने के लिए एफ़टीपी क्लाइंट को अपडेट किया जो सेकंड में संशोधन टाइमस्टैम्प प्रदान करता है (और यदि आप भाग्यशाली हैं तो मिलीसेकंड), लेकिन सभी एफ़टीपी सर्वर इसका समर्थन नहीं करते हैं।
जैसा कि Spring FTP reference पर बताया गया है, स्थानीय फ़ाइल फ़िल्टर को FileSystemPersistentAcceptOnceFileListFilter
होना चाहिए ताकि यह सुनिश्चित किया जा सके कि संशोधित टाइमस्टैम्प में स्थानीय फ़ाइलों को उठाया जाता है।
अद्यतन AbstractInboundFileSynchronizer
के मेरे संस्करण के नीचे, मेरे द्वारा उपयोग किए जाने वाले कुछ परीक्षण वर्गों के बाद।
public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile Expression localFilenameGeneratorExpression;
private volatile EvaluationContext evaluationContext;
private volatile boolean deleteRemoteFiles;
private volatile String remoteFileSeparator = "/";
private volatile boolean preserveTimestamp;
public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
setPreserveTimestamp(true);
}
@Override
public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
}
@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
super.setIntegrationEvaluationContext(evaluationContext);
this.evaluationContext = evaluationContext;
}
@Override
public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
super.setDeleteRemoteFiles(deleteRemoteFiles);
this.deleteRemoteFiles = deleteRemoteFiles;
}
@Override
public void setRemoteFileSeparator(String remoteFileSeparator) {
super.setRemoteFileSeparator(remoteFileSeparator);
this.remoteFileSeparator = remoteFileSeparator;
}
@Override
public void setPreserveTimestamp(boolean preserveTimestamp) {
// updated
Assert.isTrue(preserveTimestamp, "for updating timestamps must be preserved");
super.setPreserveTimestamp(preserveTimestamp);
this.preserveTimestamp = preserveTimestamp;
}
@Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory,
Session<FTPFile> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String localFileName = this.generateLocalFileName(remoteFileName);
String remoteFilePath = (remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName);
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
// start update
File localFile = new File(localDirectory, localFileName);
boolean update = false;
if (localFile.exists()) {
if (this.getModified(remoteFile) > localFile.lastModified()) {
this.logger.info("Updating local file " + localFile);
update = true;
} else {
this.logger.info("File already exists: " + localFile);
return;
}
}
// end update
String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try {
session.read(remoteFilePath, outputStream);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
} finally {
try {
outputStream.close();
}
catch (Exception ignored2) {
}
}
// updated
if (update && !localFile.delete()) {
throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled()) {
this.logger.debug("deleted " + remoteFilePath);
}
}
// updated
this.logger.info("Stored file locally: " + localFile);
} else {
// updated
throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
}
if (this.preserveTimestamp) {
localFile.setLastModified(getModified(remoteFile));
}
}
private String generateLocalFileName(String remoteFileName) {
if (this.localFilenameGeneratorExpression != null) {
return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName, String.class);
}
return remoteFileName;
}
}
मैंने उपयोग किए गए कुछ परीक्षण कक्षाओं के बाद। मैंने निर्भरता org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
और org.apache.ftpserver:ftpserver-core:1.0.6
(साथ ही सामान्य लॉगिंग और परीक्षण निर्भरता) का उपयोग किया।
public class TestFtpSync {
static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
// org.apache.ftpserver:ftpserver-core:1.0.6
static FtpServer server;
@BeforeClass
public static void startServer() throws FtpException {
File ftpRoot = new File (FTP_ROOT_DIR);
ftpRoot.mkdirs();
TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
FtpServerFactory serverFactory = new FtpServerFactory();
serverFactory.setUserManager(userManager);
ListenerFactory factory = new ListenerFactory();
factory.setPort(4444);
serverFactory.addListener("default", factory.createListener());
server = serverFactory.createServer();
server.start();
}
@AfterClass
public static void stopServer() {
if (server != null) {
server.stop();
}
}
File ftpFile = Paths.get(FTP_ROOT_DIR, "test1.txt").toFile();
File ftpFile2 = Paths.get(FTP_ROOT_DIR, "test2.txt").toFile();
@Test
public void syncDir() {
// org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
try {
ctx.register(FtpSyncConf.class);
ctx.refresh();
PollableChannel msgChannel = ctx.getBean("inputChannel", PollableChannel.class);
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 2; i++) {
storeFtpFile();
}
for (int i = 0; i < 4; i++) {
fetchMessage(msgChannel);
}
}
} catch (Exception e) {
throw new AssertionError("FTP test failed.", e);
} finally {
ctx.close();
cleanup();
}
}
boolean tswitch = true;
void storeFtpFile() throws IOException, InterruptedException {
File f = (tswitch ? ftpFile : ftpFile2);
tswitch = !tswitch;
log.info("Writing message " + f.getName());
Files.write(f.toPath(), ("Hello " + System.currentTimeMillis()).getBytes());
}
Message<?> fetchMessage(PollableChannel msgChannel) {
log.info("Fetching message.");
Message<?> msg = msgChannel.receive(1000L);
if (msg == null) {
log.info("No message.");
} else {
log.info("Have a message: " + msg);
}
return msg;
}
void cleanup() {
delFile(ftpFile);
delFile(ftpFile2);
File d = new File(FtpSyncConf.LOCAL_DIR);
if (d.isDirectory()) {
for (File f : d.listFiles()) {
delFile(f);
}
}
log.info("Finished cleanup");
}
void delFile(File f) {
if (f.isFile()) {
if (f.delete()) {
log.info("Deleted " + f);
} else {
log.error("Cannot delete file " + f);
}
}
}
}
public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {
@Override
protected MlistFtpClient createClientInstance() {
return new MlistFtpClient();
}
}
public class MlistFtpClient extends FTPClient {
@Override
public FTPFile[] listFiles(String pathname) throws IOException {
return super.mlistDir(pathname);
}
}
@EnableIntegration
@Configuration
public class FtpSyncConf {
private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);
public static final String LOCAL_DIR = "/tmp/received";
@Bean(name = "ftpMetaData")
public ConcurrentMetadataStore ftpMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "localMetaData")
public ConcurrentMetadataStore localMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "ftpFileSyncer")
public FtpUpdatingFileSynchronizer ftpFileSyncer(
@Qualifier("ftpMetaData") ConcurrentMetadataStore metadataStore) {
MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
ftpSessionFactory.setHost("localhost");
ftpSessionFactory.setPort(4444);
ftpSessionFactory.setUsername("demo");
ftpSessionFactory.setPassword("demo");
FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(metadataStore, "ftp");
fileFilter.setFlushOnUpdate(true);
FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
ftpFileSync.setFilter(fileFilter);
// ftpFileSync.setDeleteRemoteFiles(true);
return ftpFileSync;
}
@Bean(name = "syncFtp")
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "500", maxMessagesPerPoll = "1"))
public MessageSource<File> syncChannel(
@Qualifier("localMetaData") ConcurrentMetadataStore metadataStore,
@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
File receiveDir = new File(LOCAL_DIR);
receiveDir.mkdirs();
messageSource.setLocalDirectory(receiveDir);
messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, "local"));
log.info("Message source bean created.");
return messageSource;
}
@Bean(name = "inputChannel")
public PollableChannel inputChannel() {
QueueChannel channel = new QueueChannel();
log.info("Message channel bean created.");
return channel;
}
}
/**
* Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
* @author Gunnar Hillert
*
*/
public class TestUserManager extends AbstractUserManager {
private BaseUser testUser;
private BaseUser anonUser;
private static final String TEST_USERNAME = "demo";
private static final String TEST_PASSWORD = "demo";
public TestUserManager(String homeDirectory) {
super("admin", new ClearTextPasswordEncryptor());
testUser = new BaseUser();
testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1, 1), new WritePermission()}));
testUser.setEnabled(true);
testUser.setHomeDirectory(homeDirectory);
testUser.setMaxIdleTime(10000);
testUser.setName(TEST_USERNAME);
testUser.setPassword(TEST_PASSWORD);
anonUser = new BaseUser(testUser);
anonUser.setName("anonymous");
}
public User getUserByName(String username) throws FtpException {
if(TEST_USERNAME.equals(username)) {
return testUser;
} else if(anonUser.getName().equals(username)) {
return anonUser;
}
return null;
}
public String[] getAllUserNames() throws FtpException {
return new String[] {TEST_USERNAME, anonUser.getName()};
}
public void delete(String username) throws FtpException {
throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
}
public void save(User user) throws FtpException {
throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
}
public boolean doesExist(String username) throws FtpException {
return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
}
public User authenticate(Authentication authentication) throws AuthenticationFailedException {
if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;
if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getPassword())) {
return testUser;
}
if(anonUser.getName().equals(upAuth.getUsername())) {
return anonUser;
}
} else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
return anonUser;
}
return null;
}
}
अद्यतन 15/नवम्बर/2016: एक्सएमएल-विन्यास पर नोट।
एक्सएमएल तत्व inbound-channel-adapter
सीधे org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser
के माध्यम से FtpInboundFileSynchronizer
से जुड़ा हुआ है FtpNamespaceHandler
के माध्यम से spring-integration-ftp-4.3.5.RELEASE.jar!/META-INF/spring.handlers
के माध्यम से।
xml-custom संदर्भ मार्गदर्शिका के बाद, स्थानीय META-INF/spring.handlers
फ़ाइल में एक कस्टम FtpNamespaceHandler
निर्दिष्ट करने से आपको FtpInboundFileSynchronizer
के बजाय FtpUpdatingFileSynchronizer
का उपयोग करने की अनुमति देनी चाहिए। यह यूनिट-टेस्ट के साथ मेरे लिए काम नहीं करता है और एक उचित समाधान में शायद अतिरिक्त/संशोधित xsd-files बनाने में शामिल होगा ताकि नियमित inbound-channel-adapter
नियमित FtpInboundFileSynchronizer
का उपयोग कर रहा हो और एक विशेष inbound-updating-channel-adapter
FtpUpdatingFileSynchronizer
का उपयोग कर रहा हो। इस उत्तर के लिए इसे ठीक से करने का दायरा थोड़ा सा है।
हालांकि एक त्वरित हैक आपको शुरू कर सकता है। आप अपने स्थानीय प्रोजेक्ट में पैकेज org.springframework.integration.ftp.config
और कक्षा FtpNamespaceHandler
बनाकर डिफ़ॉल्ट FtpNamespaceHandler
को ओवरराइट कर सकते हैं। सामग्री पर गौर करें:
package org.springframework.integration.ftp.config;
public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {
@Override
public void init() {
System.out.println("Initializing FTP updating file synchronizer.");
// one updated line below, rest copied from original FtpNamespaceHandler
registerBeanDefinitionParser("inbound-channel-adapter", new MyFtpInboundChannelAdapterParser());
registerBeanDefinitionParser("inbound-streaming-channel-adapter",
new FtpStreamingInboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-channel-adapter", new FtpOutboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-gateway", new FtpOutboundGatewayParser());
}
}
package org.springframework.integration.ftp.config;
import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;
public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {
@Override
protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
System.out.println("Returning updating file synchronizer.");
return FtpUpdatingFileSynchronizer.class;
}
}
भी एक्सएमएल फ़ाइल को preserve-timestamp="true"
जोड़ने नई IllegalArgumentException: for updating timestamps must be preserved
को रोकने के लिए।
नहीं यह 'AbstractInboundFileSynchronizer' (या' FtpInboundFileSynchronizer') का विस्तार करने और केवल ओवरराइड करने के लिए संभव है 'synchronizeToLocalDirectory' यह एक कॉलबैक अपनी खुद की' copyFiles' विधि का उपयोग करता है पारित करने के लिए? – walen
हे @chuchichaeschtli, बस सोच रहा है कि क्या आप अब तक के उत्तरों पर किसी प्रकार की प्रतिक्रिया देने की योजना बना रहे हैं। इस तरह के एक उच्च बक्षीस सेट करने के लिए अजीब तरह की तरह और फिर थ्रेड को अनदेखा करें, imho:/ – walen
क्षमा करें। बक्षीस सेट करने के बाद मैंने स्प्रिंग्स एफटीपी एडाप्टर को छोड़ने और स्क्रैच से अपना "सिंक्रनाइज़र" लिखने का फैसला किया और अभिव्यक्ति विशेषता का उपयोग करके इसे आमंत्रित किया। –